In [80]:
from pyspark.sql import SparkSession
import zipfile
from pyspark.sql.functions import col, unix_timestamp,to_date,avg,sum,min,max,count,month,current_date,year

In [2]:

spark = SparkSession.builder\
    .config("spark.driver.bindAddress", "localhost") \
    .getOrCreate() 

In [3]:

# Create DataFrame
data = [
("James", "zen", "Smith", "1991-04-01", "M", 3000),
("Michael", "Rose", "zwee", "2000-05-19", "M", 4000),
("Robert", "", "Williams", "1978-09-05", "M", 4000),
("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
("Jen", "Mary", "Brown", "1980-02-17", "F", 400),
]
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.show(truncate=False)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |zen       |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |zwee    |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |400   |
+---------+----------+--------+----------+------+------+



In [52]:

with zipfile.ZipFile("PythonPractice\data-engineering-practice-main\Exercises\Exercise-6\data\Divvy_Trips_2019_Q4.zip") as archive:
    text = archive.read("Divvy_Trips_2019_Q4.csv").decode(encoding="utf-8")

#print(text)

df = spark.read.option("header", "true").csv(spark.sparkContext.parallelize(text.split("\n")))
#df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))

print(df.show(truncate=False))

+--------+-------------------+-------------------+------+------------+---------------+------------------------------+-------------+----------------------------+----------+------+---------+
|trip_id |start_time         |end_time           |bikeid|tripduration|from_station_id|from_station_name             |to_station_id|to_station_name             |usertype  |gender|birthyear|
+--------+-------------------+-------------------+------+------------+---------------+------------------------------+-------------+----------------------------+----------+------+---------+
|25223640|2019-10-01 00:01:39|2019-10-01 00:17:20|2215  |940.0       |20             |Sheffield Ave & Kingsbury St  |309          |Leavitt St & Armitage Ave   |Subscriber|Male  |1987     |
|25223641|2019-10-01 00:02:16|2019-10-01 00:06:34|6328  |258.0       |19             |Throop (Loomis) St & Taylor St|241          |Morgan St & Polk St         |Subscriber|Male  |1998     |
|25223642|2019-10-01 00:04:32|2019-10-01 00:18:43|3003 

In [82]:

df = df.withColumn('end_time', col('end_time').cast('Timestamp'))
df = df.withColumn('start_time', col('start_time').cast('Timestamp'))
df = df.withColumn('birthyear', col('birthyear').cast('Integer'))

df = df.withColumn('tripduration',unix_timestamp(col('end_time')) -unix_timestamp(col('start_time')))
df = df.withColumn('tripdate',to_date(col('end_time')))
df = df.withColumn('age',year(current_date()) - col('birthyear'))


In [83]:
print(df.show(truncate=False))

+--------+-------------------+-------------------+------+------------+---------------+------------------------------+-------------+----------------------------+----------+------+---------+----------+----+
|trip_id |start_time         |end_time           |bikeid|tripduration|from_station_id|from_station_name             |to_station_id|to_station_name             |usertype  |gender|birthyear|tripdate  |age |
+--------+-------------------+-------------------+------+------------+---------------+------------------------------+-------------+----------------------------+----------+------+---------+----------+----+
|25223640|2019-10-01 00:01:39|2019-10-01 00:17:20|2215  |941         |20             |Sheffield Ave & Kingsbury St  |309          |Leavitt St & Armitage Ave   |Subscriber|Male  |1987     |2019-10-01|38  |
|25223641|2019-10-01 00:02:16|2019-10-01 00:06:34|6328  |258         |19             |Throop (Loomis) St & Taylor St|241          |Morgan St & Polk St         |Subscriber|Male  |19

In [55]:
df.groupby('tripdate').agg(
    avg('tripduration').alias('avgtripduration'),
    count('trip_id').alias('totaltrip')
    ).show()

+----------+------------------+---------+
|  tripdate|   avgtripduration|totaltrip|
+----------+------------------+---------+
|2019-10-05|1318.4542840713054|    10434|
|2019-10-01| 972.1641100957354|    18384|
|2019-10-22| 967.5284884775903|    11022|
|2019-10-04|  950.214849921011|    14559|
|2019-12-14| 4416.052333804809|     3535|
|2019-10-02| 812.9518279135528|     9902|
|2019-10-08|1025.5990415335464|    17528|
|2019-10-14|1308.5160846254166|    13802|
|2019-10-15| 1082.923094271538|    13302|
|2019-10-07| 1113.469835572024|    17272|
|2019-10-06|1663.7502235469449|    13420|
|2019-10-03| 904.1126986160943|    15608|
|2019-10-09|1005.6840790238234|    17210|
|2019-11-09|1792.5347364499908|     5369|
|2019-10-21| 963.1177872340426|    11750|
|2020-01-13|         8585902.0|        1|
|2019-10-12|1409.6069537186277|     8686|
|2019-10-20|1382.7980512036684|    10468|
|2019-10-10| 954.5956291056089|    15832|
|2019-10-16| 876.7375368846094|    12878|
+----------+------------------+---

In [70]:
df_new = df.groupby(month('tripdate').alias('tripmonth'),'from_station_id').agg(
    count('from_station_id').alias('numberoftrip')
)
maxtrip_df = df_new.groupBy('tripmonth').agg(max('numberoftrip').alias('maxnumbertrip'))

final_df= df_new.join(maxtrip_df,
                      on = 'tripmonth'
                      ).filter(
                          df_new['numberoftrip'] == maxtrip_df['maxnumbertrip']).select(
                              'tripmonth','from_station_id','maxnumbertrip'
                          )
final_df.show()

+---------+---------------+-------------+
|tripmonth|from_station_id|maxnumbertrip|
+---------+---------------+-------------+
|       10|            192|         6564|
|       11|            192|         3445|
|       12|            192|         2928|
|        1|             90|            5|
+---------+---------------+-------------+



In [72]:
df.groupby('gender').agg(
    avg('tripduration').alias('avgtripduration')
).show()

+------+------------------+
|gender|   avgtripduration|
+------+------------------+
|  NULL| 3826.708819510144|
|Female|1102.4052011867236|
|  Male| 855.7464493410545|
+------+------------------+



In [88]:
df_age = df.groupby('age').agg(
    avg('tripduration').alias('avgtripduration')
).orderBy('avgtripduration')

In [95]:
df_age.limit(10).show()

+---+------------------+
|age|   avgtripduration|
+---+------------------+
| 94|             282.5|
| 96|             298.5|
|104|323.53684210526313|
| 88|            336.75|
|125|          431.3125|
| 82|493.42857142857144|
| 84| 557.3333333333334|
| 83|             575.5|
|124|             689.0|
| 64| 727.8381351492928|
+---+------------------+



In [100]:
df_age.orderBy('avgtripduration').tail(10)

[Row(age=43, avgtripduration=1201.046947847772),
 Row(age=28, avgtripduration=1211.014277215943),
 Row(age=25, avgtripduration=1261.079336458709),
 Row(age=22, avgtripduration=1271.3225806451612),
 Row(age=92, avgtripduration=1298.0),
 Row(age=121, avgtripduration=1420.99),
 Row(age=23, avgtripduration=2059.361010830325),
 Row(age=26, avgtripduration=2697.7215509974712),
 Row(age=None, avgtripduration=3986.4971385029426),
 Row(age=86, avgtripduration=4123.5)]