In [None]:
pip install findspark

In [None]:
pip install pyspark

In [18]:
import pyspark
from pyspark.sql import SparkSession
#SparkSession.builder
 #    .master("local")
  #   .appName("Word Count")
   #  .config("spark.some.config.option", "some-value")
   #  .getOrCreate()

spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello_spark ")
df.show()

+-----------+
|hello_spark|
+-----------+
|      spark|
+-----------+



In [19]:
spark

In [95]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sql_sc = SQLContext(spark)
repository='C:/Users/adam_/Desktop/Adam_Work_Spark/trips_spark/Shared_Micromobility_Vehicle_2019_Trips.csv'
Austin_Trips_Df = spark.read.csv(repository,header='True',inferSchema='true') 


In [96]:
Austin_Trips_Df.count()

2198485

In [97]:
Austin_Trips_Df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Device ID: string (nullable = true)
 |-- Vehicle Type: string (nullable = true)
 |-- Trip Duration: integer (nullable = true)
 |-- Trip Distance: integer (nullable = true)
 |-- Start Time: string (nullable = true)
 |-- End Time: string (nullable = true)
 |-- Modified Date: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Council District (Start): integer (nullable = true)
 |-- Council District (End): integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Census Tract Start: string (nullable = true)
 |-- Census Tract End: string (nullable = true)



In [98]:
Austin_Trips_Df.head(2)

[Row(ID='69348ea0-87f5-43f2-a763-275a672b14fc', Device ID='7e14cf0c-4861-4780-b227-fc71f8372453', Vehicle Type='scooter', Trip Duration=500, Trip Distance=798, Start Time='11/29/2019 09:00:00 PM', End Time='11/29/2019 09:15:00 PM', Modified Date='12/01/2019 01:00:54 PM', Month=11, Hour=21, Day of Week=5, Council District (Start)=9, Council District (End)=9, Year=2019, Census Tract Start='48453001100', Census Tract End='48453001100'),
 Row(ID='44bf4d97-733f-44b9-ab69-132ad77cebe0', Device ID='37d1e6b6-ea25-4d77-88f2-d6bdc185f142', Vehicle Type='scooter', Trip Duration=500, Trip Distance=1226, Start Time='11/30/2019 10:00:00 PM', End Time='11/30/2019 10:00:00 PM', Modified Date='12/01/2019 12:00:13 PM', Month=11, Hour=22, Day of Week=6, Council District (Start)=3, Council District (End)=3, Year=2019, Census Tract Start='48453002111', Census Tract End='48453000902')]

In [99]:
Austin_Trips_Df.describe()

DataFrame[summary: string, ID: string, Device ID: string, Vehicle Type: string, Trip Duration: string, Trip Distance: string, Start Time: string, End Time: string, Modified Date: string, Month: string, Hour: string, Day of Week: string, Council District (Start): string, Council District (End): string, Year: string, Census Tract Start: string, Census Tract End: string]

# Data cleansing and transformations:

We start our cleansing by running some basic Pyspark functions as shown below:


In [100]:
Austin_Trips_Df.dropDuplicates().count()

2198485

In [101]:
Austin_Trips_Df.dropna().count()

2198461

We decide to remove non used-columns such as Year and modified date

In [102]:
Austin_Trips_Df=Austin_Trips_Df.drop(Austin_Trips_Df['Device ID']).drop(Austin_Trips_Df['Modified Date']).drop(Austin_Trips_Df['Year'])

We find out 8 duplicates rows.
We decide to cast and transform Start Time and End Time columns in order to transform them from String type to timestamp one

In [103]:
from pyspark.sql.types import  (StringType,TimestampType)                               
from datetime import datetime
from pyspark.sql.functions import col,udf

To_TS=  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
Austin_Trips_Df= Austin_Trips_Df.withColumn('Start Time', To_TS(col('Start Time'))).withColumn('End Time',To_TS(col('End Time')))

In [104]:
Austin_Trips_Df.select("Start Time","End Time").show(2)

+-------------------+-------------------+
|         Start Time|           End Time|
+-------------------+-------------------+
|2019-11-29 21:00:00|2019-11-29 21:15:00|
|2019-11-30 22:00:00|2019-11-30 22:00:00|
+-------------------+-------------------+
only showing top 2 rows



In [107]:
import pyspark.sql.types
day = {"1": "MON", "2": "TUE", "3": "WED", "4": "THU", "5": "FRI", "6": "SAT", "0": "SUN"}
day = {k:str(v) for k,v in zip(d2.keys(),d2.values())}
Austin_Trips_Df= Austin_Trips_Df.withColumn('Day of Week', Austin_Trips_Df['Day of Week'].cast("string"))
Austin_Trips_Df = Austin_Trips_Df.na.replace(day,1,"Day of Week")

In [112]:
user_func =  udf (lambda x: True if x in('SAT','SUN') else False)
Austin_Trips_Df= Austin_Trips_Df.withColumn('Is_Weekend',user_func(Austin_Trips_Df['Day of Week']))

We continue our transformation operations and we decide to convert Trip distance from Meter to Kilometer and Trip duration from Second to Hour as well.

In [38]:
Austin_Trips_Df=Austin_Trips_Df.withColumn('Trip Distance',(Austin_Trips_Df['Trip Distance'])/1000).withColumnRenamed('Trip Distance', 'Trip_Distance(km)').withColumn('Trip Duration',(Austin_Trips_Df['Trip Duration'])/3600).withColumnRenamed('Trip Duration', 'Trip_Duration(H)')

In [54]:
Austin_Trips_Df=Austin_Trips_Df.withColumn('Speed',Austin_Trips_Df['Trip_Distance(km)']/Austin_Trips_Df['Trip_Duration(H)'])

Checking the new schema as result of our previous operations 

In [77]:
Austin_Trips_Df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Vehicle Type: string (nullable = true)
 |-- Trip_Duration(H): double (nullable = true)
 |-- Trip_Distance(km): double (nullable = true)
 |-- Start Time: timestamp (nullable = true)
 |-- End Time: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Day of Week: string (nullable = true)
 |-- Council District (Start): integer (nullable = true)
 |-- Council District (End): integer (nullable = true)
 |-- Census Tract Start: string (nullable = true)
 |-- Census Tract End: string (nullable = true)
 |-- Speed: double (nullable = true)



In [57]:
Austin_Trips_Df.select("Trip_Duration(H)","Trip_Distance(km)","Speed").describe().show(truncate= True)

+-------+-------------------+------------------+-------------------+
|summary|   Trip_Duration(H)| Trip_Distance(km)|              Speed|
+-------+-------------------+------------------+-------------------+
|  count|            2198485|           2198485|            2198485|
|   mean|0.32679564765939273|2.7320238618867076|  9.574117014114679|
| stddev|0.27763931174384077|1.9427202841884252|  5.205455038152195|
|    min| 0.1388888888888889|               0.5|0.12589467954823788|
|    max|  4.166666666666667|            19.988| 138.41501976284584|
+-------+-------------------+------------------+-------------------+



In [55]:
Austin_Trips_Df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Vehicle Type: string (nullable = true)
 |-- Trip_Duration(H): double (nullable = true)
 |-- Trip_Distance(km): double (nullable = true)
 |-- Start Time: timestamp (nullable = true)
 |-- End Time: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Council District (Start): integer (nullable = true)
 |-- Council District (End): integer (nullable = true)
 |-- Census Tract Start: string (nullable = true)
 |-- Census Tract End: string (nullable = true)
 |-- Speed: double (nullable = true)



In [49]:
Austin_Trips_Df.groupBy("Day of Week").count().sort("count",ascending=True).show()

+-----------+------+
|Day of Week| count|
+-----------+------+
|          2|205285|
|          3|212124|
|          1|224827|
|          4|253354|
|          5|369949|
|          0|387447|
|          6|545499|
+-----------+------+



In [50]:
Austin_Trips_Df.groupBy("Month").count().sort("count",ascending=True).show()

+-----+------+
|Month| count|
+-----+------+
|   12|   413|
|   11|117461|
|    1|139085|
|    2|147246|
|    8|184916|
|   10|192227|
|    6|195874|
|    7|199113|
|    9|206771|
|    4|218794|
|    5|232842|
|    3|363743|
+-----+------+



In [52]:
Austin_Trips_Df.crosstab('Vehicle Type', 'Month').sort("Vehicle Type_Month").show()

+------------------+------+------+------+---+------+------+------+------+------+------+------+------+
|Vehicle Type_Month|     1|    10|    11| 12|     2|     3|     4|     5|     6|     7|     8|     9|
+------------------+------+------+------+---+------+------+------+------+------+------+------+------+
|           bicycle|  6228| 15949| 10147|  0|  8236| 32245| 22176| 23251| 19403| 16667| 13345| 15821|
|           scooter|132857|176278|107314|413|139010|331498|196618|209591|176471|182446|171571|190950|
+------------------+------+------+------+---+------+------+------+------+------+------+------+------+



In [53]:
Austin_Trips_Df.crosstab('Day of Week', 'Month').sort("Day of Week_Month").show()

+-----------------+-----+-----+-----+---+-----+-----+-----+-----+-----+-----+-----+-----+
|Day of Week_Month|    1|   10|   11| 12|    2|    3|    4|    5|    6|    7|    8|    9|
+-----------------+-----+-----+-----+---+-----+-----+-----+-----+-----+-----+-----+-----+
|                0|25266|36754|21884|413|30258|61927|36897|45605|34702|29481|25278|38982|
|                1|15006|18607|12900|  0|16292|30053|28599|26062|19190|19578|15121|23419|
|                2|16939|17562|10263|  0|13304|33726|25210|18612|16961|20521|15455|16732|
|                3|11658|17357| 9846|  0|16402|36730|16096|25713|20626|22697|17688|17311|
|                4|17452|21969| 8472|  0|14178|45861|24930|27600|20729|28746|23063|20354|
|                5|19630|31641|17789|  0|21506|67774|40988|38575|31360|33579|34989|32118|
|                6|33134|48337|36307|  0|35306|87672|46074|50675|52306|44511|53322|57855|
+-----------------+-----+-----+-----+---+-----+-----+-----+-----+-----+-----+-----+-----+



In [20]:
Austin_Trips_Df.describe(['Speed']).show()

+-------+------------------+
|summary|             Speed|
+-------+------------------+
|  count|           2115406|
|   mean|431.76059691297746|
| stddev|17208.996380778306|
|    min|               0.0|
|    max| 5242221.507692307|
+-------+------------------+



In [24]:
Austin_Trips_Df=Austin_Trips_Df.withColumn('Cost($)',(Austin_Trips_Df['Trip_Duration(H)']*60)*0.15+1)

In [25]:
Austin_Trips_Df.describe(['Cost($)']).show()

+-------+------------------+
|summary|        Revenue($)|
+-------+------------------+
|  count|           2115406|
|   mean| 2.526046323968071|
| stddev|2.1863338249025457|
|    min|            1.0025|
|    max|212.92749999999998|
+-------+------------------+



In [26]:
Austin_Trips_Df.head(2)

[Row(ID='000f0d96-dd73-452d-bb01-e817adaeab0d', Device ID='4a22632e-89db-42e9-88d4-f8bfff68b677', Vehicle Type='scooter', Trip_Duration(H)=0.5366666666666666, Trip_Distance(km)=4.675, Start Time=datetime.datetime(2019, 7, 7, 20, 15), End Time=datetime.datetime(2019, 7, 7, 20, 45), Month=7, Hour=20, Day of Week=0, Council District (Start)=5, Council District (End)=5, Census Tract Start='48453001303', Census Tract End='48453001303', Speed=8.711180124223603, Revenue($)=5.829999999999999),
 Row(ID='008286fd-ad90-4611-b44b-bd169fa37964', Device ID='23716c9d-2821-4a7f-8bd7-3593212f69cd', Vehicle Type='scooter', Trip_Duration(H)=0.10361111111111111, Trip_Distance(km)=2.139, Start Time=datetime.datetime(2019, 7, 10, 9, 45), End Time=datetime.datetime(2019, 7, 10, 9, 45), Month=7, Hour=9, Day of Week=3, Council District (Start)=9, Council District (End)=9, Census Tract Start='48453000603', Census Tract End='48453000601', Speed=20.64450402144772, Revenue($)=1.9325)]

In [None]:
PriceDf.crosstab('Trip_Duration(H)','Cost($)').show(3)