## Problem 09

Weightage: 40

Join trips data with stations and get a denormalized table with both startstationname and endstationname on top of all fields from trips.


## Data Description
All of the citibike trip data is available under **/public/citibike/trips**. It contain multiple folders - one for each month. Here is the schema.

```
root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: string (nullable = true)
 |-- endstationid: string (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
```
All of the citibike station data is available under **/public/citibike/stations**. 
```
root
 |-- stationid: long (nullable = true)
 |-- stationlatitude: string (nullable = true)
 |-- stationlongitude: string (nullable = true)
 |-- stationname: string (nullable = true)
```

## Output Requirements
* Place the result in the HDFS Directory 
```
/user/`whoami`/mock_test_02/problem09/solution
```
* Use Parquet File format with any number of files.
* Here are the column names. Data types should be as below.
```
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)
```
* There are no requirements for sorting the data.

## Validation

Here are the self validation steps:
* Run the following to check number of files.
```
hdfs dfs -ls /user/`whoami`/mock_test_02/problem09/solution
```
* Run this code to create dataframe by name data.
```
import getpass
username = getpass.getuser()
data = spark.read. \
    parquet(f'/user/{username}/mock_test_02/problem09/solution')
```
* Run `data.printSchema()` to validate the data types of the fields.
```
root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)
```
* Run `data.count()` to validate number of records. It should be **54462016**
* Run `data.show()` to preview the data. Make sure all the data is showing up as expected.

In [20]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName(f'Problem 09 | {username}'). \
    master('yarn'). \
    getOrCreate()

In [21]:
trips_df=spark.read.csv('/public/citibike/trips/',header=True)

In [22]:
station_df=spark.read.json('/public/citibike/stations')

In [23]:
from pyspark.sql.functions import date_format,count,lit,col
station_df.filter(col('stationid')==504).select('stationname')

stationname
1 Ave & E 16 St


In [24]:
from pyspark.sql.functions import date_format,count,lit,col
joined_df1=trips_df.join(station_df,
                         on=trips_df.startstationid.cast('int')==station_df.stationid.cast("int"),
                         how="inner"). \
            withColumnRenamed('stationname',"startstationanme")


In [25]:
joined_df2=trips_df.join(station_df,
                         on=trips_df.endstationid.cast('int')==station_df.stationid.cast("int"),
                         how="inner"). \
           withColumnRenamed('stationname',"endstationanme") 

In [35]:
output=joined_df1.join(joined_df2,on=joined_df1.startstationid==joined_df2.startstationid, how="inner"). \
        select(joined_df1['*'],joined_df2.endstationanme) .\
        drop("stationid","stationlatitude","stationlongitude")

In [31]:
output.show()
        

+------------+-------------------+-------------------+--------------+------------+------+----------+---------+------+------+--------------------+--------------------+
|tripduration|          starttime|           stoptime|startstationid|endstationid|bikeid|  usertype|birthyear|gender| month|    startstationanme|      endstationanme|
+------------+-------------------+-------------------+--------------+------------+------+----------+---------+------+------+--------------------+--------------------+
|        1402|2017-05-01 00:15:50|2017-05-01 00:39:13|           296|        3258| 17885|Subscriber|     1963|     1|201705|Division St & Bowery|Barclay St & Chur...|
|        1402|2017-05-01 00:15:50|2017-05-01 00:39:13|           296|        3258| 17885|Subscriber|     1963|     1|201705|Division St & Bowery|Allen St & Hester St|
|        1402|2017-05-01 00:15:50|2017-05-01 00:39:13|           296|        3258| 17885|Subscriber|     1963|     1|201705|Division St & Bowery|     E 27 St & 1 Ave

In [32]:
output.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- startstationid: string (nullable = true)
 |-- endstationid: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationanme: string (nullable = true)
 |-- endstationanme: string (nullable = true)



In [10]:
table1_df = spark.read.option("header", True).csv("/public/citibike/trips/month=*")

In [11]:
table2_df2 = spark.read.json("/public/citibike/stations")

In [12]:
join_df = table1_df.join(table2_df2, table1_df.startstationid == table2_df2.stationid,"inner")

In [13]:
from pyspark.sql.functions import date_format, count, lit, col, split, concat

output = join_df.select(col("tripduration").cast("int"), col("starttime").cast("timestamp"),
                         col("stoptime").cast("timestamp"), col("startstationid").cast("int"), 
                         col("endstationid").cast("int"), col("bikeid").cast("int"), 
                         "usertype", "birthyear", col("gender").cast("int"),
                         date_format("starttime", "MM").cast("int").alias("month"),
                         split(col("stationname"), "&")[0].alias("startstationname"),
                         split(col("stationname"), "&")[1].alias("endstationname"))

In [14]:
output. \
    coalesce(2). \
    write. \
    parquet('/user/itv002461/mock_test_02/problem09/solution',mode='overwrite')

In [15]:
%%sh
hdfs dfs -ls /user/${USER}/mock_test_02/problem09/solution

Found 3 items
-rw-r--r--   3 itv002461 supergroup          0 2022-07-12 04:48 /user/itv002461/mock_test_02/problem09/solution/_SUCCESS
-rw-r--r--   3 itv002461 supergroup  616821918 2022-07-12 04:47 /user/itv002461/mock_test_02/problem09/solution/part-00000-d3ea4329-0d91-4bdb-b7a1-a68e4ff25d5f-c000.snappy.parquet
-rw-r--r--   3 itv002461 supergroup  673340921 2022-07-12 04:48 /user/itv002461/mock_test_02/problem09/solution/part-00001-d3ea4329-0d91-4bdb-b7a1-a68e4ff25d5f-c000.snappy.parquet


In [16]:
import getpass
username = getpass.getuser()
data = spark.read. \
  parquet(f'/user/{username}/mock_test_02/problem09/solution')

In [17]:
data.printSchema()

root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- startstationid: integer (nullable = true)
 |-- endstationid: integer (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- startstationname: string (nullable = true)
 |-- endstationname: string (nullable = true)



In [18]:
data.count()

54462016

In [19]:
data.show()

+------------+--------------------+--------------------+--------------+------------+------+----------+---------+------+-----+--------------------+------------------+
|tripduration|           starttime|            stoptime|startstationid|endstationid|bikeid|  usertype|birthyear|gender|month|    startstationname|    endstationname|
+------------+--------------------+--------------------+--------------+------------+------+----------+---------+------+-----+--------------------+------------------+
|         327|2019-09-01 00:00:...|2019-09-01 00:05:...|          3733|         504| 39213|Subscriber|     1968|     1|    9|           Avenue C |           E 18 St|
|        2219|2019-09-29 12:04:...|2019-09-29 12:41:...|          3372|        3686| 18261|Subscriber|     1974|     2|    9|            E 74 St |             1 Ave|
|        1145|2019-09-01 00:00:...|2019-09-01 00:19:...|          3329|         270| 21257|  Customer|     1969|     0|    9|          Degraw St |          Smith St|
|   