# Clean Taxi Datasets

In [12]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
packages="""io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0"""

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"

In [4]:
spark = SparkSession \
            .builder \
            .config("spark.executor.cores", 4) \
            .config("spark.executor.memory", "4g") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.master", "spark://spark-master:7077") \
            .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
            .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
            .config("spark.hadoop.fs.s3a.endpoint", "minio:9000") \
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.metastore.catalog.default", "hive") \
            .config("spark.sql.warehouse.dir", "s3a://storage/warehouse") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.connection.maximum", "50") \
            .config("spark.hive.metastore.uris", "thrift://172.30.0.4:9083") \
            .appName("Jupyter Testing Clean Data") \
            .enableHiveSupport() \
            .getOrCreate()

In [5]:
yellow_raw = spark.sql("""select * from default.yellow_taxi_pre2015""")

In [6]:
yellow_raw.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [16]:
yellow_raw.select("passenger_count").distinct().collect()

[Row(passenger_count='7'),
 Row(passenger_count='3'),
 Row(passenger_count='8'),
 Row(passenger_count='0'),
 Row(passenger_count='5'),
 Row(passenger_count='208'),
 Row(passenger_count='6'),
 Row(passenger_count='9'),
 Row(passenger_count='1'),
 Row(passenger_count='4'),
 Row(passenger_count='2')]

In [19]:
yellow_processed = yellow_raw \
    .withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
    .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime')) \
    .withColumn('passenger_count', F.col('passenger_count').cast('integer')) \
    .withColumn('trip_distance', F.col('trip_distance').cast('float')) \
    .withColumn('pickup_longitude', F.col('pickup_longitude').cast('float')) \
    .withColumn('pickup_latitude', F.col('pickup_latitude').cast('float')) \
    .withColumn('rate_code_id', F.col('rate_code')) \
    .withColumn('dropoff_longitude', F.col('dropoff_longitude').cast('float')) \
    .withColumn('dropoff_latitude', F.col('dropoff_latitude').cast('float')) \
    .withColumn('payment_type', F.col('payment_type').cast('integer')) \
    .withColumn('fare_amount', F.col('fare_amount').cast('float')) \
    .withColumn('improvement_surcharge', F.col('surcharge').cast('float')) \
    .withColumn('mta_tax', F.col('mta_tax').cast('float')) \
    .withColumn('tip_amount', F.col('tip_amount').cast('float')) \
    .withColumn('tolls_amount', F.col('tolls_amount').cast('float')) \
    .withColumn('total_amount', F.col('total_amount').cast('float')) \
    .drop('surcharge')

In [20]:
yellow_processed.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- rate_code_id: string (nullable = true)
 |-- improvement_surcharge: float (nullable = true)



In [15]:
yellow_processed.select('pickup_datetime').take(10)

[Row(pickup_datetime=datetime.datetime(2013, 8, 15, 1, 49, 36)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 16, 7, 34, 24)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 14, 7, 27, 21)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 14, 11, 39, 31)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 18, 1, 17, 48)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 13, 13, 37, 45)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 12, 0, 1, 3)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 14, 8, 30, 4)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 14, 8, 3, 47)),
 Row(pickup_datetime=datetime.datetime(2013, 8, 14, 6, 30, 56))]

In [21]:
spark.stop()