In [3]:
import click
import pandas as pd
from time import time
import os
from google.cloud import storage
import pyspark
import psycopg2
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
from dotenv import load_dotenv
from io import BytesIO
from sqlalchemy import create_engine

In [4]:
#@click.command() # click commands instead of argparse.ArgumentParser()... or sys.argv[n]
#@click.option('--sa_path', help='Path to the service account json file')
#@click.option('--project_id', help='Project ID of you GCP project')
#@click.option('--year', default=2021, help='Year to download')
#@click.option('--bucket', help='Name of the bucket to upload the data')
#@click.option('--color', help='Str of the taxi-color for which data should be extracted')
#@click.option('--month', help='Int of the month to summarize the data for')

sa_path = '../mle-neue-fische-gunnaroeh-0fc41b31bc57.json'
project_id = 'mle-neue-fische-gunnaroeh'
bucket = "01_data_pipeline_project" 
project_id = "mle-neue-fische-gunnaroeh" 
color = "green"
year = 2021
month = 1

In [5]:
def extract_data(sa_path, bucket, color, year, month):
    # Create Spark Session
    # config("spark.jars", "../postgresql-42.6.0.jar") \
    spark = SparkSession.builder \
    .master("local") \
    .appName(f"Pipe-{color}_taxi_{year}-{month:02d}") \
    .getOrCreate()
    # string of the file to be loaded
    file_name = f"ny_taxi/{color}_tripdata_{year}-{month:02d}.parquet"
    
    # Establish connection to GCS-Bucket
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = sa_path
    # Create an instance of the GCS client to communicate with the Cloud
    client = storage.Client()
    
    # Retrieve address/path to the specified bucket and a blob representing the table
    bucket = client.get_bucket(bucket)
    blob = bucket.get_blob(file_name)

    # Download parquet and write it to memory as binary to be accessible
    pq_taxi = blob.download_as_bytes()    
    pq_taxi = BytesIO(pq_taxi)
    
    # read the object in memory as df -> spark-df
    df_taxi = pd.read_parquet(pq_taxi)
    df_taxi.drop("ehail_fee", inplace=True, axis=1)
    df_taxi = spark.createDataFrame(df_taxi)

    for col in df_taxi.columns:
        if df_taxi.schema[col].dataType == types.DoubleType():
            df_taxi = df_taxi.withColumn(col, F.col(col).cast('float'))

    return df_taxi, spark

In [6]:
df_taxi, spark = extract_data(sa_path, bucket, color, year, month)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/10 15:10:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
df_taxi.show(5)

23/07/10 15:10:30 WARN TaskSetManager: Stage 0 contains a task of very large size (10889 KiB). The maximum recommended task size is 1000 KiB.
23/07/10 15:10:35 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-01-01 00:15:56|  2021-01-01 00:19:52|                 N|       1.0|          43|         151|            1.0|         1.01|        5.5|  0.5|    0.5|       0.0|         0.0|    

In [8]:
def transform_data(df_taxi, spark):
    # some sql commands
    df_taxi = df_taxi \
        .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
        .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')\
        .withColumnRenamed('PULocationID', 'pickup_location_id')\
        .withColumnRenamed('DOLocationID', 'dropoff_location_id')
    # Temporary SQL Table to be queried
    df_taxi.registerTempTable('df_taxi_temp')
    # Query the revenue
    df_result = spark.sql("""
                    SELECT pickup_location_id AS revenue_zone,
                    date_trunc('day', pickup_datetime) AS revenue_day,
                    date_trunc('month', pickup_datetime) AS month,  
                    SUM(fare_amount) AS revenue_daily_fare,
                    SUM(extra) AS revenue_daily_extra,
                    SUM(mta_tax) AS revenue_daily_mta_tax,
                    SUM(tip_amount) AS revenue_daily_tip_amount,
                    SUM(tolls_amount) AS revenue_daily_tolls_amount,
                    SUM(improvement_surcharge) AS revenue_daily_improvement_surcharge,
                    SUM(total_amount) AS revenue_daily_total_amount,
                    SUM(congestion_surcharge) AS revenue_daily_congestion_surcharge,
                    AVG(passenger_count) AS avg_daily_passenger_count,
                    AVG(trip_distance) AS avg_daily_trip_distance
                    FROM df_taxi_temp
                    GROUP BY revenue_zone, revenue_day, month;
                      """)
    return df_result

In [9]:
df_transformed = transform_data(df_taxi, spark)



In [10]:
df_transformed.show(5)

23/07/10 15:10:37 WARN TaskSetManager: Stage 1 contains a task of very large size (10889 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1:>                                                          (0 + 1) / 1]

+------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------------+--------------------------+-----------------------------------+--------------------------+----------------------------------+-------------------------+-----------------------+
|revenue_zone|        revenue_day|              month|revenue_daily_fare|revenue_daily_extra|revenue_daily_mta_tax|revenue_daily_tip_amount|revenue_daily_tolls_amount|revenue_daily_improvement_surcharge|revenue_daily_total_amount|revenue_daily_congestion_surcharge|avg_daily_passenger_count|avg_daily_trip_distance|
+------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------------+--------------------------+-----------------------------------+--------------------------+----------------------------------+-------------------------+-----------------------+
|          68|2021-01-09 00:00:00|2021-01-01 00:00:0

                                                                                

In [11]:
### 5. L: Load Data onto local Machine PostgreSQL
def load_data(df_transformed, spark, color, year, month):
    load_dotenv()
    # get the Database Credentials
    user = os.getenv('USER')
    pw = os.getenv('PASSWORD')
    host = os.getenv('HOST')
    port = os.getenv('PORT')
    db = os.getenv('DB')
    schema = os.getenv('SCHEMA')
    # some commands to write it into storage in the db
    # Pyspark unfortunately not working
    engine = create_engine(f'postgresql://{user}:{pw}@{host}:{port}/{db}')

    table_name = f"{color}_revenue_{year}_{month}"
    # Write DataFrame to PostgreSQL
    # df_transformed.write.format("jdbc") \
    #    .option("driver", "org.postgresql.Driver") \
    #    .option("schema", schema) \
    #    .option("dbtable", table_name) \
    #    .option("user", user) \
    #    .option("password", pw) \
    #    .save()
    df_transformed = df_transformed.toPandas()
    df_transformed.to_sql(name=table_name, con=engine, schema=schema, 
                          if_exists="replace", index=False)

In [12]:
load_data(df_transformed, spark, color, year, month)

ModuleNotFoundError: No module named 'psycopg2'