# BDA Project
- Romain Claret
- Jämes Ménétrey
- Damien Rochat

### Load PySpark

In [1]:
import os
import findspark
findspark.init()

import pyspark

#memory = '4g'
#pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["SPARK_HOME"] = "/opt/spark"
#os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

#--driver-maxResultSize 10g --executor-memory 4g

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("bda-spark-fare")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '10g')
        .set('spark.driver.memory', '10g')
        .set('spark.driver.maxResultSize', '10g')
        .set('spark.network.timeout', '1000000000')
        .set('spark.executor.heartbeatInterval', '1000000000')
        )
sc = SparkContext(conf=conf)

### Check config

In [2]:
sc._conf.getAll()

[('spark.executor.memory', '10g'),
 ('spark.driver.port', '40399'),
 ('spark.driver.memory', '10g'),
 ('spark.driver.host', 'rclaret.tic.heia-fr.ch'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'bda-spark-fare'),
 ('spark.driver.maxResultSize', '10g'),
 ('spark.network.timeout', '1000000000'),
 ('spark.executor.heartbeatInterval', '1000000000'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1559849514228'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

### Check if Spark is working with a little PI calculation using monte carlo

In [3]:
import random
num_samples = 100000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)

3.14152


### Load the Spark Context

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

### Filter the Dataset

In [5]:
def df_maker_fare(fare_path):
    df_fare = (sqlContext.read.format("csv")
                .option("delimiter", ",")
                .option("header", "true")
                .load(fare_path)
                .drop("medallion")
                .drop(" hack_license")
                .drop("hack_license")
                .drop(" vendor_id")
                .drop(" pickup_datetime")
                .drop(" payment_type")
                .drop(" tip_amount")
                .drop(" tolls_amount")
                .drop(" total_amount")
               .drop(" mta_tax")
               .withColumnRenamed(" fare_amount", "fare_amount")
               .withColumnRenamed(" surcharge", "surcharge")
              )
    return (df_fare.withColumn("fare_total",df_fare.fare_amount+df_fare.surcharge)
               .drop("fare_amount")
               .drop("surcharge")
              )

#df_fare = df_maker_fare("datasets/trip_fare/trip_fare_1.csv")
#
#df_fare.printSchema()

In [5]:
def df_maker_no_fare_filter(fare_path):
    return (sqlContext.read.format("csv")
                .option("delimiter", ",")
                .option("header", "true")
                .load(fare_path)
                .drop("medallion")
                .drop(" hack_license")
                .drop("hack_license")
                .drop(" vendor_id")
                .drop(" pickup_datetime")
                .drop(" payment_type")
                .drop(" tip_amount")
                .drop(" tolls_amount")
                .drop(" total_amount")
               .drop(" mta_tax")
               .withColumnRenamed(" fare_amount", "fare_amount")
               .withColumnRenamed(" surcharge", "surcharge")
              )

#df_fare = df_maker_no_fare_filter("datasets/trip_fare/trip_fare_1.csv")
#
#df_fare.printSchema()

In [6]:
path_fare = "datasets/trip_fare/"
for idx,e in enumerate(os.listdir(path_fare)):
    print(e)
    if idx == 0:
        dff_no_fare_filter = df_maker_no_fare_filter(path_fare+e)
    else:
        df_no_fare_filter = df_maker_no_fare_filter(path_fare+e)
        dff_no_fare_filter=dff_no_fare_filter.union(df_no_fare_filter)
dff_no_fare_filter.printSchema()
dff_no_fare_filter.show(2)

trip_fare_9.csv
trip_fare_7.csv
trip_fare_1.csv
trip_fare_11.csv
trip_fare_3.csv
trip_fare_10.csv
trip_fare_4.csv
trip_fare_8.csv
trip_fare_5.csv
trip_fare_12.csv
trip_fare_6.csv
trip_fare_2.csv
root
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)

+-----------+---------+
|fare_amount|surcharge|
+-----------+---------+
|       11.5|        0|
|       14.5|        0|
+-----------+---------+
only showing top 2 rows



In [8]:
print("no fare filter partial:",df_no_fare_filter.count())
print("no fare filter full:",dff_no_fare_filter.count())

no fare filter partial: 13990176
no fare filter full: 173179759


In [12]:
dff_fare_filtered = dff_no_fare_filter.na.drop().filter(dff_no_fare_filter.fare_amount>0)

In [13]:
print("fare filter full:",dff_fare_filtered.count())

fare filter full: 173171082


In [None]:
print("with fare filter partial:",df_no_fare_filter.count())
print("with fare filter full:",dff_no_fare_filter.count())

In [6]:
ny_lat_min = 40.50214590272583
ny_lat_max = 40.9#40.75977082462501
ny_lon_min = -74.24354116993825
ny_lon_max = -73.77490985242169
    
def df_maker_data(data_path):
    df_data = (sqlContext.read.format("csv")
                .option("delimiter", ",")
                .option("header", "true")
                .load(data_path)
                .drop("medallion")
                .drop(" hack_license")
                .drop("hack_license")
                .drop(" vendor_id")
                .drop("vendor_id")
               .drop(" rate_code")
                .drop("rate_code")
               .drop(" store_and_fwd_flag")
                .drop("store_and_fwd_flag")
               .drop(" pickup_datetime")
                .drop("pickup_datetime")
               .drop(" dropoff_datetime")
                .drop("dropoff_datetime")
               .drop(" passenger_count")
                .drop("passenger_count")
               .drop(" trip_time_in_secs")
                .drop("trip_time_in_secs")
               .drop(" trip_distance")
                .drop("trip_distance")
               .drop(" dropoff_longitude")
                .drop("dropoff_longitude")
               .drop(" dropoff_latitude")
                .drop("dropoff_latitude")
               .withColumnRenamed(" pickup_longitude", "pickup_longitude")
               .withColumnRenamed(" pickup_latitude", "pickup_latitude")
                
              )
    return (df_data.withColumn('pickup_longitude', df_data['pickup_longitude'].cast('float')) #convert str to float
            .withColumn('pickup_latitude', df_data['pickup_latitude'].cast('float'))
            .filter(df_data.pickup_latitude<=ny_lat_max)
            .filter(df_data.pickup_latitude>=ny_lat_min)
            .filter(df_data.pickup_longitude<=ny_lon_max)
            .filter(df_data.pickup_longitude>=ny_lon_min)
           )

#df_data = df_maker_data("datasets/trip_data/trip_data_1.csv")
#df_data.printSchema()

In [7]:
#df_fare.count()

In [8]:
#df_data.count()

In [9]:
from pyspark.sql.functions import monotonically_increasing_id
def df_combiner(df_fare, df_data):
    df_fare_id = df_fare.withColumn("id", monotonically_increasing_id())
    df_data_id = df_data.withColumn("id", monotonically_increasing_id())
    return df_fare_id.join(df_data_id, "id", "outer").drop("id")
#df_merged = df_combiner(df_fare, df_data)
#df_merged.printSchema()
#df_merged.show(1)

In [16]:
from math import ceil

def distribute_cost(df):
    columns = ['fare_total', 'pickup_longitude', 'pickup_latitude']
    vals = [(1, -73.977776, 40.758053)]
    df_tmp = sqlContext.createDataFrame(vals, columns)  
    for r in df.toLocalIterator():
        for i in range(ceil(r.fare_total)-1):
            newRow = sqlContext.createDataFrame([(1,r.pickup_longitude,r.pickup_latitude)], columns)
            df_tmp = df_tmp.union(newRow)
    return df.union(df_tmp)

### Create complete Dataframe from all Dataset files

In [17]:
path_trip = "datasets/trip_data/"
path_fare = "datasets/trip_fare/"
path_backups = "backups/"
columns = ['fare_total', 'pickup_longitude', 'pickup_latitude']

fare_files = os.listdir(path_fare)

for idx,e in enumerate(os.listdir(path_trip)):
    trip_file = path_trip+"trip_data_"+str(idx+1)+".csv"
    fare_file = path_fare+"trip_fare_"+str(idx+1)+".csv"

    if idx == 0:
        print("start: " + trip_file)
        dff = df_combiner(df_maker_fare(fare_file),
                          df_maker_data(trip_file))
        print("writes init parquet")
        dff.write.parquet(path_backups+"trip-dff-"+str(idx)+".parquet")
        print("distributes cost")
        dff = distribute_cost(dff)
        print("writes distributed parquet")
        dff.write.parquet(path_backups+"trip-dff-dist-"+str(idx)+".parquet")
        print("end: " + trip_file)
    else:
        print("start: " + trip_file)
        df = df_combiner(df_maker_fare(fare_file),
                          df_maker_data(trip_file))
        print("writes init parquet")
        df.write.parquet(path_backups+"trip-df-"+str(idx)+".parquet")
        print("distributes cost")
        df = distribute_cost(df)
        print("writes distributed parquet")
        df.write.parquet(path_backups+"trip-df-dist-"+str(idx)+".parquet")
        print("unions")
        dff= dff.union(df)
        print("writes master distributed parquet")
        dff.write.parquet(path_backups+"trip-dff-dist-"+str(idx)+".parquet")
        print("end: " + trip_file)

#dff.show(2)

start: datasets/trip_data/trip_data_1.csv
writes init parquet
distributes cost


KeyboardInterrupt: 

#### Count of the whole dataset

In [10]:
#original: 173264090
dff.count()

173264090

#### Count of the unique last dataset load

In [11]:
#original 7774669
df.count()

13977692

In [14]:
df.limit(2).show()

+----------+----------------+---------------+
|fare_total|pickup_longitude|pickup_latitude|
+----------+----------------+---------------+
|       6.5|      -73.977776|      40.758053|
|      12.0|       -73.95221|       40.77743|
+----------+----------------+---------------+



In [None]:
for i in df.collect():
    print(i)
    break

In [51]:
#df_small = df.limit(10)
#df_small = df.head(10)
#df_small.show()
#type(df_small)

pyspark.sql.dataframe.DataFrame

In [67]:
columns = ['fare_total', 'pickup_longitude', 'pickup_latitude']
vals = [(6.5, -73.977776, 40.758053), (12.0, -73.95221, 40.77743)]
df_tmp = sqlContext.createDataFrame(vals, columns)

df_tmp = distribute_cost(df_tmp)
df_tmp.show()

+----------+----------------+---------------+
|fare_total|pickup_longitude|pickup_latitude|
+----------+----------------+---------------+
|       6.5|      -73.977776|      40.758053|
|      12.0|       -73.95221|       40.77743|
|       1.0|      -73.977776|      40.758053|
|       1.0|      -73.977776|      40.758053|
|       1.0|      -73.977776|      40.758053|
|       1.0|      -73.977776|      40.758053|
|       1.0|      -73.977776|      40.758053|
|       1.0|      -73.977776|      40.758053|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|       40.77743|
|       1.0|       -73.95221|     

In [None]:
df.where("col1>col2")

In [23]:
df["fare_total"][0]

Column<b'fare_total[0]'>

In [19]:
type(df[0])

pyspark.sql.column.Column

In [16]:
df.

<bound method DataFrame.summary of DataFrame[fare_total: double, pickup_longitude: float, pickup_latitude: float]>

## Playing with a smaller dataset: 1/9

### Distribute cost 

In [None]:

columns = ['fare_total', 'pickup_longitude', 'pickup_latitude']

vals = [(1, 2, 0), (2, 0, 1)]

df = spark.createDataFrame(vals, columns)

newRow = spark.createDataFrame([(4,5,7)], columns)
appended = df.union(newRow)
appended.show()