In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")

sc

## Initialisieren des SQLContext Objektes und laden der Uber Daten

In [3]:
sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('data/Uber-Jan-Feb-FOIL.csv')

df

DataFrame[dispatching_base_number: string, date: string, active_vehicles: int, trips: int]

## Ausgabe des automatisch generierten Schemas

In [6]:
df.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)



## Mit DataFrames arbeiten

In [122]:
df.describe().show()

+-------+-----------------------+--------+------------------+------------------+
|summary|dispatching_base_number|    date|   active_vehicles|             trips|
+-------+-----------------------+--------+------------------+------------------+
|  count|                    354|     354|               354|               354|
|   mean|                   null|    null|1307.4350282485875|11667.316384180791|
| stddev|                   null|    null|1162.5106256246545|10648.284864561118|
|    min|                 B02512|1/1/2015|               112|               629|
|    max|                 B02765|2/9/2015|              4395|             45858|
+-------+-----------------------+--------+------------------+------------------+



In [79]:
df.approxQuantile(['active_vehicles', 'trips'], [0.01, 0.5, 0.99], 0.05)

[[112.0, 1072.0, 4395.0], [629.0, 9546.0, 45858.0]]

In [153]:
df.groupBy('dispatching_base_number').max('active_vehicles', 'trips').show()

+-----------------------+--------------------+----------+
|dispatching_base_number|max(active_vehicles)|max(trips)|
+-----------------------+--------------------+----------+
|                 B02512|                 281|      2408|
|                 B02598|                1216|     13062|
|                 B02682|                1523|     16448|
|                 B02765|                 786|      7824|
|                 B02617|                1590|     16999|
|                 B02764|                4395|     45858|
+-----------------------+--------------------+----------+



In [74]:
df.select("dispatching_base_number").distinct().show()

+-----------------------+
|dispatching_base_number|
+-----------------------+
|                 B02512|
|                 B02598|
|                 B02682|
|                 B02765|
|                 B02617|
|                 B02764|
+-----------------------+



## Aufgabe: Umstrukturieren nach Datum

Fahrten sind bisher pro Basis pro Tag, wir wollen jetzt nur noch nach Tag haben. Zusätzlich Spalten wieder umbenennen mit `withColumnRenamed`

In [169]:
# data_per_day = df.groupBy('date').sum('active_vehicles', 'trips') \
#    .withColumnRenamed('sum(active_vehicles)', 'active_vehicles').withColumnRenamed('sum(trips)', 'trips')

In [161]:
data_per_day.corr('active_vehicles', 'trips')

0.8496277378382892

In [162]:
data_per_day.approxQuantile(['active_vehicles', 'trips'], [0.01, 0.5, 0.99], 0.05)

[[3496.0, 7999.0, 9649.0], [25244.0, 70296.0, 100915.0]]

In [163]:
data_per_day.select((data_per_day.trips / data_per_day.active_vehicles).alias('trips_per_car')) \
        .agg({'trips_per_car' : 'mean'}).show()

+------------------+
|avg(trips_per_car)|
+------------------+
| 8.845142619763957|
+------------------+



In [151]:
df.select('dispatching_base_number', (df.trips / df.active_vehicles).alias('trips_per_car')) \
        .groupBy('dispatching_base_number') \
        .mean('trips_per_car').show()

+-----------------------+------------------+
|dispatching_base_number|avg(trips_per_car)|
+-----------------------+------------------+
|                 B02512| 7.073430660588635|
|                 B02598| 9.133921849595998|
|                 B02682| 9.161441123761087|
|                 B02765| 8.105403794986351|
|                 B02617| 9.024466408204646|
|                 B02764| 8.754156172824018|
+-----------------------+------------------+



## Mit SparkSQL arbeiten

In [7]:
df.registerTempTable("uber")

In [70]:
distinct_bases = sqlContext.sql("select distinct dispatching_base_number from uber")
distinct_bases.show()

+-----------------------+
|dispatching_base_number|
+-----------------------+
|                 B02512|
|                 B02598|
|                 B02682|
|                 B02765|
|                 B02617|
|                 B02764|
+-----------------------+



In [168]:
sqlContext.sql("""select dispatching_base_number, sum(trips) as cnt 
                                from uber
                                group by dispatching_base_number 
                                order by cnt desc""").show()

+-----------------------+-------+
|dispatching_base_number|    cnt|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+



In [167]:
sqlContext.sql("""select date, sum(trips) as trips 
                    from uber 
                    group by date 
                    order by trips desc 
                    limit 5""").show()

+---------+------+
|     date| trips|
+---------+------+
|2/20/2015|100915|
|2/14/2015|100345|
|2/21/2015| 98380|
|2/13/2015| 98024|
|1/31/2015| 92257|
+---------+------+



## Aufgabe: Informationen Sammeln mit SQL

1. Erfolgreichster Tag
2. Durchschnittliche Trips pro Basis
3. Tag mit geringster Auslastung (`trips / active_vehicles`)



In [None]:
sc.stop()