In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

sc = SparkSession.builder.appName("TrafficAnalysis").master("local").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

In [3]:
traffic =sc.read.text("Dodgers.data")
type(traffic)

pyspark.sql.dataframe.DataFrame

In [4]:
traffic.take(10)

[Row(value='4/10/2005 0:00,-1'),
 Row(value='4/10/2005 0:05,-1'),
 Row(value='4/10/2005 0:10,-1'),
 Row(value='4/10/2005 0:15,-1'),
 Row(value='4/10/2005 0:20,-1'),
 Row(value='4/10/2005 0:25,-1'),
 Row(value='4/10/2005 0:30,-1'),
 Row(value='4/10/2005 0:35,-1'),
 Row(value='4/10/2005 0:40,-1'),
 Row(value='4/10/2005 0:45,-1')]

**<h2>Using format and schema to interpret data file as csv </h2>**

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

schema= StructType([StructField("timestamp",StringType()),StructField("number_of_cars",StringType())])

traffic =sc.read.format("csv").schema(schema).load("Dodgers.data")
type(traffic)

pyspark.sql.dataframe.DataFrame

In [6]:
traffic.show(5)

+--------------+--------------+
|     timestamp|number_of_cars|
+--------------+--------------+
|4/10/2005 0:00|            -1|
|4/10/2005 0:05|            -1|
|4/10/2005 0:10|            -1|
|4/10/2005 0:15|            -1|
|4/10/2005 0:20|            -1|
+--------------+--------------+
only showing top 5 rows



**<h2>Infering schema as false and type casting column two</h2>**

In [7]:
games =sc.read.format("csv").option("inferSchema","false").load("Dodgers.events")

In [8]:
from pyspark.sql.functions import col
traffic =traffic.withColumn("number_of_cars_i", col("number_of_cars").cast("int")).drop("number_of_cars")

#traffic_by_time =traffic.groupBy("timestamp").agg(sum("number_of_cars_i"))

In [9]:
traffic.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- number_of_cars_i: integer (nullable = true)



In [10]:
from pyspark.sql.functions import sum

traffic_by_time = traffic.groupBy("timestamp").agg(sum("number_of_cars_i").alias("no_of_cars_per_time"))

In [11]:
traffic_by_time.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- no_of_cars_per_time: long (nullable = true)



In [12]:
from pyspark.sql.functions import desc

traffic_by_time.sort(desc("no_of_cars_per_time")).show(10)


+---------------+-------------------+
|      timestamp|no_of_cars_per_time|
+---------------+-------------------+
| 7/14/2005 8:25|                 90|
|7/15/2005 14:40|                 75|
| 7/3/2005 23:40|                 72|
| 7/14/2005 8:50|                 70|
|8/26/2005 22:30|                 70|
| 7/14/2005 9:05|                 70|
| 7/3/2005 23:00|                 66|
|4/13/2005 22:10|                 66|
|8/14/2005 15:40|                 64|
|5/13/2005 22:30|                 64|
+---------------+-------------------+
only showing top 10 rows



In [13]:
from pyspark.sql.functions import to_timestamp, date_format
df_datetime = traffic_by_time.withColumn("datetime", to_timestamp("timestamp", "MM/dd/yyyy HH:mm"))

In [14]:
df_datetime.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- no_of_cars_per_time: long (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [15]:
df_datetime.show(10)

+---------------+-------------------+-------------------+
|      timestamp|no_of_cars_per_time|           datetime|
+---------------+-------------------+-------------------+
| 4/10/2005 2:30|                 -1|2005-04-10 02:30:00|
| 4/10/2005 3:00|                 -1|2005-04-10 03:00:00|
| 4/10/2005 4:10|                 -1|2005-04-10 04:10:00|
| 4/10/2005 8:20|                 -1|2005-04-10 08:20:00|
|4/10/2005 11:00|                 -1|2005-04-10 11:00:00|
| 4/11/2005 1:00|                 -1|2005-04-11 01:00:00|
| 4/11/2005 3:20|                 -1|2005-04-11 03:20:00|
|4/11/2005 12:30|                 32|2005-04-11 12:30:00|
|4/12/2005 19:35|                 32|2005-04-12 19:35:00|
|4/13/2005 11:10|                 25|2005-04-13 11:10:00|
+---------------+-------------------+-------------------+
only showing top 10 rows



In [16]:
df_datetime=df_datetime.drop("timestamp")

In [17]:
df_datetime.printSchema()

root
 |-- no_of_cars_per_time: long (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [18]:
df_datetime.show(10)

+-------------------+-------------------+
|no_of_cars_per_time|           datetime|
+-------------------+-------------------+
|                 -1|2005-04-10 02:30:00|
|                 -1|2005-04-10 03:00:00|
|                 -1|2005-04-10 04:10:00|
|                 -1|2005-04-10 08:20:00|
|                 -1|2005-04-10 11:00:00|
|                 -1|2005-04-11 01:00:00|
|                 -1|2005-04-11 03:20:00|
|                 32|2005-04-11 12:30:00|
|                 32|2005-04-12 19:35:00|
|                 25|2005-04-13 11:10:00|
+-------------------+-------------------+
only showing top 10 rows



In [19]:
from pyspark.sql.functions import date_format
df_datetime=df_datetime.withColumn("date",  date_format("datetime", "MM/dd/yyyy")).drop("datetime")

In [20]:
df_datetime.printSchema()

root
 |-- no_of_cars_per_time: long (nullable = true)
 |-- date: string (nullable = true)



In [21]:
df_datetime.show(10)

+-------------------+----------+
|no_of_cars_per_time|      date|
+-------------------+----------+
|                 -1|04/10/2005|
|                 -1|04/10/2005|
|                 -1|04/10/2005|
|                 -1|04/10/2005|
|                 -1|04/10/2005|
|                 -1|04/11/2005|
|                 -1|04/11/2005|
|                 32|04/11/2005|
|                 32|04/12/2005|
|                 25|04/13/2005|
+-------------------+----------+
only showing top 10 rows



**<h2>Getting insights on cars per day</h2>**

In [22]:
from pyspark.sql.functions import sum,desc

df_datetime=df_datetime.groupBy("date").agg(sum("no_of_cars_per_time").alias("cars_per_date")).orderBy(desc("cars_per_date"))

In [23]:
df_datetime.printSchema()

root
 |-- date: string (nullable = true)
 |-- cars_per_date: long (nullable = true)



In [24]:
df_datetime.show()

+----------+-------------+
|      date|cars_per_date|
+----------+-------------+
|07/28/2005|         7661|
|07/29/2005|         7499|
|08/12/2005|         7287|
|07/27/2005|         7238|
|09/23/2005|         7175|
|07/26/2005|         7163|
|05/20/2005|         7119|
|08/11/2005|         7110|
|09/08/2005|         7107|
|09/07/2005|         7082|
|09/30/2005|         7079|
|08/10/2005|         7060|
|07/22/2005|         7028|
|08/05/2005|         6924|
|09/29/2005|         6917|
|07/25/2005|         6898|
|09/09/2005|         6897|
|09/16/2005|         6885|
|09/28/2005|         6831|
|04/12/2005|         6822|
+----------+-------------+
only showing top 20 rows



In [25]:
from pyspark.sql.functions import asc

df_datetime.orderBy(asc("cars_per_date")).show(10)

+----------+-------------+
|      date|cars_per_date|
+----------+-------------+
|04/10/2005|         -288|
|10/01/2005|         -260|
|06/28/2005|          -96|
|07/04/2005|          328|
|07/12/2005|         1204|
|05/23/2005|         2173|
|09/17/2005|         2426|
|09/10/2005|         2851|
|06/27/2005|         2907|
|07/10/2005|         3518|
+----------+-------------+
only showing top 10 rows



**<h2>to remove the -ve values</h2>**

In [26]:
from pyspark.sql.functions import col, when

df_datetime_clean=df_datetime.withColumn("cars_per_date", when(col("cars_per_date") < 0, 0).otherwise(col("cars_per_date")))

In [27]:
df_datetime_clean.sort(asc("cars_per_date")).show(10)

+----------+-------------+
|      date|cars_per_date|
+----------+-------------+
|06/28/2005|            0|
|04/10/2005|            0|
|10/01/2005|            0|
|07/04/2005|          328|
|07/12/2005|         1204|
|05/23/2005|         2173|
|09/17/2005|         2426|
|09/10/2005|         2851|
|06/27/2005|         2907|
|07/10/2005|         3518|
+----------+-------------+
only showing top 10 rows



**<h2>Getting insights on cars per day and combining it with event day data</h2>**

In [28]:
games.show(10)

+--------+--------+--------+-----+-------------+------+
|     _c0|     _c1|     _c2|  _c3|          _c4|   _c5|
+--------+--------+--------+-----+-------------+------+
|04/12/05|13:10:00|16:23:00|55892|San Francisco|W 9-8�|
|04/13/05|19:10:00|21:48:00|46514|San Francisco|W 4-1�|
|04/15/05|19:40:00|21:48:00|51816|    San Diego|W 4-0�|
|04/16/05|19:10:00|21:52:00|54704|    San Diego|W 8-3�|
|04/17/05|13:10:00|15:31:00|53402|    San Diego|W 6-0�|
|04/25/05|19:10:00|21:33:00|36876|      Arizona|L 4-2�|
|04/26/05|19:10:00|22:00:00|44486|      Arizona|L 3-2�|
|04/27/05|19:10:00|22:17:00|54387|      Arizona|L 6-3�|
|04/29/05|19:40:00|22:01:00|40150|     Colorado|W 6-3�|
|04/30/05|19:10:00|21:45:00|54123|     Colorado|W 6-2�|
+--------+--------+--------+-----+-------------+------+
only showing top 10 rows



In [29]:
games= games.withColumn("date", to_timestamp("_c0", "MM/dd/yy"))

In [30]:
from pyspark.sql.functions import col
games = games.withColumn("against", col("_c4"))

In [31]:
games=games.select("date","against")

In [32]:
games.show()

+-------------------+-------------+
|               date|      against|
+-------------------+-------------+
|2005-04-12 00:00:00|San Francisco|
|2005-04-13 00:00:00|San Francisco|
|2005-04-15 00:00:00|    San Diego|
|2005-04-16 00:00:00|    San Diego|
|2005-04-17 00:00:00|    San Diego|
|2005-04-25 00:00:00|      Arizona|
|2005-04-26 00:00:00|      Arizona|
|2005-04-27 00:00:00|      Arizona|
|2005-04-29 00:00:00|     Colorado|
|2005-04-30 00:00:00|     Colorado|
|2005-05-01 00:00:00|     Colorado|
|2005-05-02 00:00:00|   Washington|
|2005-05-03 00:00:00|   Washington|
|2005-05-04 00:00:00|   Washington|
|2005-05-13 00:00:00|      Atlanta|
|2005-05-14 00:00:00|      Atlanta|
|2005-05-15 00:00:00|      Atlanta|
|2005-05-16 00:00:00|      Florida|
|2005-05-17 00:00:00|      Florida|
|2005-05-18 00:00:00|      Florida|
+-------------------+-------------+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import to_timestamp, date_format
games= games.withColumn("date_m", date_format("date","dd/MM/yyy")).drop("date")
games.show()

+-------------+----------+
|      against|    date_m|
+-------------+----------+
|San Francisco|12/04/2005|
|San Francisco|13/04/2005|
|    San Diego|15/04/2005|
|    San Diego|16/04/2005|
|    San Diego|17/04/2005|
|      Arizona|25/04/2005|
|      Arizona|26/04/2005|
|      Arizona|27/04/2005|
|     Colorado|29/04/2005|
|     Colorado|30/04/2005|
|     Colorado|01/05/2005|
|   Washington|02/05/2005|
|   Washington|03/05/2005|
|   Washington|04/05/2005|
|      Atlanta|13/05/2005|
|      Atlanta|14/05/2005|
|      Atlanta|15/05/2005|
|      Florida|16/05/2005|
|      Florida|17/05/2005|
|      Florida|18/05/2005|
+-------------+----------+
only showing top 20 rows



In [34]:
games = games.withColumn("date", col("date_m")).drop("date_m")

In [35]:
games.printSchema()

root
 |-- against: string (nullable = true)
 |-- date: string (nullable = true)



In [36]:
dailyTrendCombined = df_datetime_clean.join(games, on="date", how="left")

**<h2>Creating new column with day name based on the against column</h2>**

In [41]:
from pyspark.sql.functions import col
dailyTrendCombined = dailyTrendCombined.withColumn("day", when(col("against").isNull()
                                                               , "Regular Day").otherwise("Match Day"))

In [None]:
dailyTrendCombined.select

In [47]:
dailyTrendCombined.()

+----+-------------+-------+---+
|date|cars_per_date|against|day|
+----+-------------+-------+---+
+----+-------------+-------+---+
only showing top 0 rows



In [43]:
dailyTrendCombined.count()

175

**Combine by key on**

In [45]:
from pyspark.sql.functions import avg
avg_score = dailyTrendCombined.groupBy("day").agg(avg("cars_per_date").alias("avg_cars_on"))

In [46]:
avg_score.show()

+-----------+-----------------+
|        day|      avg_cars_on|
+-----------+-----------------+
|  Match Day|6063.111111111111|
|Regular Day|5642.036144578313|
+-----------+-----------------+

