In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, isnan, when, count, hour, day,month, year, weekday, udf
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

In [2]:
spark=SparkSession.builder.appName('uber_data_processing').getOrCreate()

In [3]:
df=spark.read.csv('uber_data_2016.csv',header=True,inferSchema=True)

In [4]:
df=df.withColumn('tpep_pickup_datetime',to_timestamp('tpep_pickup_datetime','dd-MM-yyyy HH:mm'))
df=df.withColumn('tpep_dropoff_datetime',to_timestamp('tpep_pickup_datetime','dd-MM-yyyy HH:mm'))
df=df.na.drop(how='all')
df=df.na.drop(how='all',subset=['VendorID'])

In [5]:
#Check the count of Null values per column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RatecodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       0|                   0|                    0|              0|            0|               0|              0|         0|                 0|              

In [6]:
datetime_dim = df.selectExpr(
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'hour(tpep_pickup_datetime) as pickup_hour',
    'day(tpep_pickup_datetime) as pickup_day',
    'month(tpep_pickup_datetime) as pickup_month',
    'year(tpep_pickup_datetime) as pickup_year',
    'weekday(tpep_pickup_datetime) as pickup_weekday',
    'hour(tpep_dropoff_datetime) as dropoff_hour',
    'day(tpep_dropoff_datetime) as dropoff_day',
    'month(tpep_dropoff_datetime) as dropoff_month',
    'year(tpep_dropoff_datetime) as dropoff_year',
    'weekday(tpep_dropoff_datetime) as dropoff_weekday',
    'monotonically_increasing_id() as datetime_id'
)
datetime_dim=datetime_dim.select('datetime_id','tpep_pickup_datetime','pickup_hour','pickup_day','pickup_month','pickup_year','pickup_weekday','tpep_dropoff_datetime','dropoff_hour','dropoff_day','dropoff_month','dropoff_year','dropoff_weekday')


In [7]:
location_dim=df.selectExpr('monotonically_increasing_id() as location_id','pickup_longitude','pickup_latitude','dropoff_longitude','dropoff_latitude')

In [8]:
payment_type_dim=df.selectExpr('monotonically_increasing_id() as payment_type_id', 'payment_type')
rate_code_dim=df.selectExpr('monotonically_increasing_id() as rate_code_id', 'RateCodeID')

In [17]:
fact_table=df.selectExpr('monotonically_increasing_id() as trip_id','VendorID','passenger_count','trip_distance','store_and_fwd_flag','fare_amount','extra',
                         'mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount')

all_table = fact_table \
    .join(datetime_dim, fact_table['trip_id'] == datetime_dim['datetime_id'], how='left') \
    .join(location_dim, fact_table['trip_id'] == location_dim['location_id'], how='left') \
    .join(rate_code_dim, fact_table['trip_id'] == rate_code_dim['rate_code_id'], how='left') \
    .join(payment_type_dim, fact_table['trip_id'] == payment_type_dim['payment_type_id'], how= 'left')
all_table.show()

+-------+--------+---------------+-------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+-----------+--------------------+-----------+----------+------------+-----------+--------------+---------------------+------------+-----------+-------------+------------+---------------+-----------+----------------+---------------+-----------------+----------------+------------+----------+---------------+------------+
|trip_id|VendorID|passenger_count|trip_distance|store_and_fwd_flag|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|datetime_id|tpep_pickup_datetime|pickup_hour|pickup_day|pickup_month|pickup_year|pickup_weekday|tpep_dropoff_datetime|dropoff_hour|dropoff_day|dropoff_month|dropoff_year|dropoff_weekday|location_id|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|rate_code_id|RateCodeID|payment_type_id|payment_type|
+-------+--------+---------------+-------------+--------

# Extra Data manipulation for any further analysis 


In [10]:
#Find trip duration in seconds, minutes & hours  ## With "withColumn" function
df=df.withColumn('trip_duration_in_seconds',(F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')))
df=df.withColumn('trip_duration_minutes',F.col('trip_duration_in_seconds')/60)
df=df.withColumn('trip_duration_hours',F.col('trip_duration_in_seconds')/3600)

#Find trip distance in meters, trip distance per passenger in meters, trip distance per passenger in kms  ##with "selectExpr"function
df= df.withColumn('trip_distance_in_meters',F.col('trip_distance')*1000) \
    .withColumn('distance_per_pessenger',(F.col('trip_distance_in_meters')/F.col('passenger_count'))) \
    .withColumn('distance_per_pessenger_in_kms',F.col('distance_per_pessenger')/1000)
df_extra=df.selectExpr(
    "trip_distance * 1000 as trip_distance_in_meters",
    "(trip_distance * 1000) / passenger_count as distance_per_pessenger",
    "((trip_distance * 1000) / passenger_count) / 1000 as distance_per_pessenger_in_kms"
)


In [20]:
import pandas as pd
pandas_df=all_table.toPandas()
pandas_df.to_csv('all_tables.csv')

In [None]:
'''
Use below code for AWS glue to import the raw data from AWS S3 and Write the transformed data into AWS Redshift 
def MyTransform(glueContext, dfc) -> DynamicFrame:
    from pyspark.sql import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, isnan, when, count, hour, day,month, year, weekday, udf, to_timestamp, monotonically_increasing_id
    from pyspark.sql import functions as F
    from pyspark.sql.types import StringType

    
    spark = glueContext.spark_session

    
    s3_input_path = 's3://uber-project/Uber_Raw_Data/uber_data_2016.csv'
    
    
    df = spark.read.csv(s3_input_path, header=True, inferSchema=True)
    
    
    df.printSchema()
    
    
    datetime_dim = df.selectExpr(
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'hour(tpep_pickup_datetime) as pickup_hour',
    'day(tpep_pickup_datetime) as pickup_day',
    'month(tpep_pickup_datetime) as pickup_month',
    'year(tpep_pickup_datetime) as pickup_year',
    'weekday(tpep_pickup_datetime) as pickup_weekday',
    'hour(tpep_dropoff_datetime) as dropoff_hour',
    'day(tpep_dropoff_datetime) as dropoff_day',
    'month(tpep_dropoff_datetime) as dropoff_month',
    'year(tpep_dropoff_datetime) as dropoff_year',
    'weekday(tpep_dropoff_datetime) as dropoff_weekday',
    'monotonically_increasing_id() as datetime_id'
    )
    datetime_dim=datetime_dim.select('datetime_id','tpep_pickup_datetime','pickup_hour','pickup_day','pickup_month','pickup_year','pickup_weekday','tpep_dropoff_datetime','dropoff_hour','dropoff_day','dropoff_month','dropoff_year','dropoff_weekday')

    location_dim=df.selectExpr('monotonically_increasing_id() as location_id','pickup_longitude','pickup_latitude','dropoff_longitude','dropoff_latitude')
    payment_type_dim=df.selectExpr('monotonically_increasing_id() as payment_type_id', 'payment_type')
    rate_code_dim=df.selectExpr('monotonically_increasing_id() as rate_code_id', 'RateCodeID')
    
    fact_table=df.selectExpr('monotonically_increasing_id() as trip_id','VendorID','passenger_count','trip_distance','store_and_fwd_flag','fare_amount','extra',
                         'mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount')
                         

    # Convert Spark DataFrame to Glue DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    datetime_dynamic_frame = DynamicFrame.fromDF(datetime_dim, glueContext, "datetime_dynamic_frame")
    location_dynamic_frame = DynamicFrame.fromDF(location_dim, glueContext, "location_dynamic_frame")
    payment_type_dynamic_frame = DynamicFrame.fromDF(payment_type_dim, glueContext, "payment_type_dynamic_frame")
    rate_code_dynamic_frame = DynamicFrame.fromDF(rate_code_dim, glueContext, "rate_code_dynamic_frame")
    fact_dynamic_frame = DynamicFrame.fromDF(fact_table, glueContext, "fact_dynamic_frame")
    
    #Define the redshift connection ---------- Modify below details as per your configuration
    redshift_options = {
        "url": "jdbc:redshift://<redshift-cluster-url>:<port>/<database>",
        "user": "<Enter your redshift username>",
        "password": "<Enter your redshift password>",
        "dbtable": "<redshift-schema-name>.<fact_table>",  
        "aws_iam_role": "<iam-role-arn>"  
    }
    
    
     # Write the fact table to Redshift
    redshift_fact_options = redshift_options.copy()
    redshift_fact_options["dbtable"] = "<redshift-schema-name>.fact_table"
    glueContext.write_dynamic_frame.from_options(
        fact_dynamic_frame,
        connection_type="redshift",
        connection_options=redshift_fact_options
    )

    # Write the datetime dimension table to Redshift
    redshift_datetime_options = redshift_options.copy()
    redshift_datetime_options["dbtable"] = "<redshift-schema-name>.datetime_dim"
    glueContext.write_dynamic_frame.from_options(
        datetime_dynamic_frame,
        connection_type="redshift",
        connection_options=redshift_datetime_options
    )

    
    redshift_location_options = redshift_options.copy()
    redshift_location_options["dbtable"] = "<redshift-schema-name>.location_dim"
    glueContext.write_dynamic_frame.from_options(
        location_dynamic_frame,
        connection_type="redshift",
        connection_options=redshift_location_options
    )

   
    redshift_payment_type_options = redshift_options.copy()
    redshift_payment_type_options["dbtable"] = "<redshift-schema-name>.payment_type_dim"
    glueContext.write_dynamic_frame.from_options(
        payment_type_dynamic_frame,
        connection_type="redshift",
        connection_options=redshift_payment_type_options
    )

    
    redshift_rate_code_options = redshift_options.copy()
    redshift_rate_code_options["dbtable"] = "<redshift-schema-name>.rate_code_dim"
    glueContext.write_dynamic_frame.from_options(
        rate_code_dynamic_frame,
        connection_type="redshift",
        connection_options=redshift_rate_code_options
    )

    
    
    # Return the DynamicFrame (for Redshift, you return a single DynamicFrame)
    #After all the writes to Redshift are completed, the function returns the fact_dynamic_frame. This is done in case the fact table data is needed further in the pipeline or for debugging purposes. If you don’t need to use the fact_dynamic_frame again, this return statement can be skipped.
    return dynamic_frame



'''