In [70]:
import re
import os
import json
import random
import string
import argparse
import datetime
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

In [11]:
spark = SparkSession.builder\
                    .appName("DataMaskingScript")\
                    .getOrCreate()
spark

In [27]:
df_taxi_green = spark.read.parquet("/home/Spark/data/raw/green/2021/*")
df_taxi_yellow = spark.read.parquet("/home/Spark/data/raw/yellow/2021/*")

In [26]:
df_taxi_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
common_columns = []

In [34]:
df_taxi_green = df_taxi_green.withColumnRenamed("lpep_pickup_datetime","pickup_datetime")\
                             .withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime")

df_taxi_yellow = df_taxi_yellow.withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
                             .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")

In [35]:
(set(df_taxi_green.columns)) & (set(df_taxi_yellow.columns))

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [41]:
commn_columns = [] 
for column in df_taxi_green.columns:
    yellow_columns = set(df_taxi_yellow.columns)
    if column in yellow_columns:
        commn_columns.append(column)
    else:
        pass

In [52]:
commn_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [47]:
df_common_green = df_taxi_green.select(commn_columns)\
             .withColumn("taxiType", F.lit("green"))
df_common_yellow = df_taxi_yellow.select(commn_columns)\
             .withColumn("taxiType", F.lit("yellow"))

In [50]:
df_trips_data = df_common_green.unionAll(df_common_yellow)

In [51]:
df_trips_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- taxiType: string (nullable = false)



In [54]:
df_trips_data.groupby("taxiType").count().show()

+--------+--------+
|taxiType|   count|
+--------+--------+
|   green| 1068755|
|  yellow|30904308|
+--------+--------+



In [55]:
df_trips_data.createOrReplaceTempView("trips_data")

In [58]:
spark.sql("""
select 
    count(*) 
from 
    trips_data
group by 
    taxiType

""").show()

+--------+
|count(1)|
+--------+
| 1068755|
|30904308|
+--------+



In [63]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    taxiType, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [64]:
df_result.coalesce(1).write.parquet("/home/Spark/data/revenue/")

In [69]:
def contains_chars(char):
    pattern = re.compile(r'[A-Za-z0-9\u0600-\u06FF]')
    return bool(pattern.search(char))

def mask_string(value):
    if isinstance(value, str):
        if len(value) < 5:
            value = value + value[-2:]
        for char in value:
            if char in list(mapping_dict.keys()):
                value = value.replace(char, mapping_dict[char])
            else: 
                value = value.replace(char, list(mapping_dict.keys())[-1])
    return valuedf_result.schema["revenue_monthly_fare"].dataType


def mask_nested_structure(df, column_name):
    schema = df.schema[column_name].dataType

    if isinstance(schema, ArrayType):
        element_type = schema.elementType
        if isinstance(element_type, StructType):
            for field in element_type.fields:
                nested_col_name = f"{column_name}.{field.name}"
                df = mask_nested_structure(df, nested_col_name)
        elif isinstance(element_type, StringType):
            df = df.withColumn(column_name, F.expr(f"transform({column_name}, x -> {mask_string_udf(x)})"))

    elif isinstance(schema, StructType):
        for field in schema.fields:
            nested_col_name = f"{column_name}.{field.name}"
            df = mask_nested_structure(df, nested_col_name)

    return df

In [67]:
df_companies = spark.read.json("/home/Spark/data/CompqniesData.json")

In [68]:
df_companies.printSchema()

root
 |-- goods: struct (nullable = true)
 |    |-- customers: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- orders: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- orderId: long (nullable = true)
 |    |    |    |-- orderTotal: double (nullable = true)
 |    |    |    |-- shipped: struct (nullable = true)
 |    |    |    |    |-- orderItems: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- itemName: string (nullable = true)
 |    |    |    |    |    |    |-- itemQty: long (nullable = true)
 |    |-- trade: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- satellites: array (nullable = true)
 |    |-- element: string (containsNull = true)

