In [1]:
!pip install pyspark

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Inventory ETL") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
    .getOrCreate()

# Set the necessary AWS credentials
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "minio:9000")

# Set the path to the JSON file
get_users_file = "s3a://warehouse/inventory/dbserver1.inventory.customers/partition=0/*.json"

# Read the JSON file
raw_data = spark.read \
    .format("json") \
    .option("inferSchema", "true") \
    .json(get_users_file)

# Display raw data
raw_data.show()


Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5
+--------------------+------+---+--------------------+-----------+-------------+
|               after|before| op|              source|transaction|        ts_ms|
+--------------------+------+---+--------------------+-----------+-------------+
|{100.0, Ford, 201...|  null|  r|{mysql, inventory...|       null|1679965843817|
+--------------------+------+---+--------------------+-----------+-------------+



In [2]:
# Process the data
silver_data = raw_data.select(
    "after.id",
    "after.plate_number",
    "after.car_make",
    "after.car_year",
    "after.owner_name",
    "after.owner_address",
    "after.owner_phone_number",
    "after.subscription_status",
    "after.subscription_start",
    "after.subscription_end",
    "after.balance",
    "after.timestamp"
)

In [3]:
silver_data.write.parquet("s3a://warehouse/inventory/silver_data", mode="overwrite")

In [4]:
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")

In [5]:
silver_data.show()

+--------------------+------------+--------+--------+----------+--------------------+------------------+-------------------+------------------+----------------+-------+--------------------+
|                  id|plate_number|car_make|car_year|owner_name|       owner_address|owner_phone_number|subscription_status|subscription_start|subscription_end|balance|           timestamp|
+--------------------+------------+--------+--------+----------+--------------------+------------------+-------------------+------------------+----------------+-------+--------------------+
|5a5c562e-4386-44a...|    7695-OOO|    Ford|    2012|    Stefen|92834 Kim Unions\...|      +14385064453|             active|              null|            null|  100.0|2023-03-03T14:37:49Z|
+--------------------+------------+--------+--------+----------+--------------------+------------------+-------------------+------------------+----------------+-------+--------------------+



In [6]:
!pip install twilio
!pip install mysql-connector-python

Collecting twilio
  Downloading twilio-7.17.0-py2.py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
Installing collected packages: twilio
Successfully installed twilio-7.17.0
Collecting mysql-connector-python
  Downloading mysql_connector_python-8.0.32-cp38-cp38-manylinux1_x86_64.whl (23.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.5/23.5 MB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting protobuf<=3.20.3,>=3.11.0
  Downloading protobuf-3.20.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: protobuf, mysql-connector-python
  Attempting uninstall: protobuf
    Found existing installation: protobuf 4.21.7
    Uninstalling protobuf-4.2

In [7]:
from datetime import datetime as dt, timedelta, timezone
import pytz
from twilio.rest import Client
from pyspark.sql import Row
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType
import datetime
import mysql.connector
from typing import Optional

# Additional imports
from mysql.connector import Error

TWILIO_ACCOUNT_SID = 'TWILIO_AUTH_TOKEN =  'TWILIO_PHONE_NUMBER = '
client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")

def get_rate_for_customer(timestamp, subscription_status):
    if subscription_status == 'active':
        if 0 <= timestamp.hour < 6 or 11 <= timestamp.hour < 16:
            return 2.99
        elif 6 <= timestamp.hour < 11 or 16 <= timestamp.hour < 23:
            return 3.99
    else:
        return 9.99

    # Add a default rate value to avoid NoneType issues
    return 0.0


def is_subscription_active(subscription_start: dt, subscription_end: dt, current_time: dt) -> bool:
    return subscription_start <= current_time <= subscription_end

def get_subscription_status(subscription_end: dt, current_time: dt) -> bool:
    grace_period = timedelta(days=7)
    return current_time <= subscription_end + grace_period


def send_sms(phone_number, message):
    try:
        client.messages.create(
            body=message,
            from_=TWILIO_PHONE_NUMBER,
            to=phone_number
        )
        print(f"SMS sent to {phone_number}: {message}")
    except Exception as e:
        print(f"Error sending SMS: {e}")

from pyspark.sql.functions import col

def is_valid_balance(value):
    try:
        float(value)
        return True
    except ValueError:
        return False

valid_balance_udf = udf(is_valid_balance, BooleanType())

silver_data = silver_data.filter(valid_balance_udf(col("balance")))

# Database configuration
db_config = {
    "host": "mysql",
    "user": "root",
    "password": "debezium",
    "database": "inventory"
}

def update_customer_balance(customer_id, new_balance):
    try:
        connection = mysql.connector.connect(**db_config)
        cursor = connection.cursor()
        update_query = "UPDATE customers SET balance = %s WHERE id = %s"
        cursor.execute(update_query, (new_balance, customer_id))
        connection.commit()
        print(f"Updated balance for customer {customer_id}: {new_balance}")
    except Error as e:
        print(f"Error updating balance: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close() 

from datetime import datetime, timezone

def safe_date_conversion(date_string: Optional[str]) -> dt:
    if date_string is None or not isinstance(date_string, str):
        return dt(1970, 1, 1, tzinfo=timezone.utc)
    try:
        return dt.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        return dt(1970, 1, 1, tzinfo=timezone.utc)

def process_plate(row: Row) -> None:
    print(f"Processing plate: {row.plate_number}")
    current_time = dt.now(timezone.utc)
    try:
        plate_timestamp = dt.fromisoformat(row.timestamp[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        plate_timestamp = dt.fromtimestamp(0, timezone.utc)

    subscription_start = safe_date_conversion(row.subscription_start)
    subscription_end = safe_date_conversion(row.subscription_end)

    is_active = is_subscription_active(subscription_start, subscription_end, current_time)
    rate = get_rate_for_customer(plate_timestamp, row.subscription_status)

    balance = float(row.balance)
    new_balance = balance - rate

    if row.subscription_status == 'none':
        message = f"Dear {row.owner_name}, your car with plate number {row.plate_number} is not registered. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif is_active:  # Changed from row.subscription_status == 'active'
        message = f"Dear {row.owner_name}, your subscription is active. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif not get_subscription_status(subscription_end, current_time):
        message = f"Dear {row.owner_name}, your subscription has expired. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)

        update_customer_balance(row.id, new_balance)

silver_data.foreach(process_plate)

In [8]:
"""
sample_data = Row(
    id='5a5c562e-4386-44ad-bf6f-bab91081781e',
    plate_number='7695-OOO',
    car_make='Ford',
    car_year=2012,
    owner_name='Becky Smith',
    owner_address='92834 Kim Unions\nPort Harryport, MD 61729',
    owner_phone_number='+14354123654',
    subscription_status='none',
    subscription_start=None,
    subscription_end=None,
    balance=100.0,  # Replace 'Exc=' with a valid float value
    timestamp='2023-03-03T14:37:49Z',
    rate=9.99
)

process_plate(sample_data)
"""

"\nsample_data = Row(\n    id='5a5c562e-4386-44ad-bf6f-bab91081781e',\n    plate_number='7695-OOO',\n    car_make='Ford',\n    car_year=2012,\n    owner_name='Becky Smith',\n    owner_address='92834 Kim Unions\nPort Harryport, MD 61729',\n    owner_phone_number='+14385064453',\n    subscription_status='none',\n    subscription_start=None,\n    subscription_end=None,\n    balance=100.0,  # Replace 'Exc=' with a valid float value\n    timestamp='2023-03-03T14:37:49Z',\n    rate=9.99\n)\n\nprocess_plate(sample_data)\n"

In [9]:
gold_data = silver_data.groupBy("subscription_status").count()

gold_data.show()

gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")


import pyspark.sql.functions as F
from pyspark.sql import SparkSession

class MetricsAdapter:
    def __init__(self, silver_table, warehouse_path):
        self.silver_table = silver_table
        self.warehouse_path = warehouse_path
        
    def show_metrics(self):
        daily_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/daily_metrics')
        weekly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/weekly_metrics')
        monthly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/monthly_metrics')
        quarterly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/quarterly_metrics')
        yearly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/yearly_metrics')
        subscription_status_count = silver_data.groupBy("subscription_status").count()

        print("Daily Metrics:")
        daily_metrics.show(5)

        print("Weekly Metrics:")
        weekly_metrics.show(5)

        print("Monthly Metrics:")
        monthly_metrics.show(5)

        print("Quarterly Metrics:")
        quarterly_metrics.show(5)

        print("Yearly Metrics:")
        yearly_metrics.show(5)    

    def transform(self):
        # Calculate the week, month, quarter, and year from the timestamp
        time_based_metrics = self.silver_table.withColumn("date", F.to_date("timestamp")) \
            .withColumn("year", F.year("timestamp")) \
            .withColumn("quarter", F.quarter("timestamp")) \
            .withColumn("month", F.month("timestamp")) \
            .withColumn("week_of_year", F.weekofyear("timestamp")) \
            .withColumn("total_passages", F.lit(1)) \
            .withColumn("total_revenue", F.when(self.silver_table.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99))


        # Daily metrics
        daily_metrics = time_based_metrics.groupBy("date").agg(
            F.count("*").alias("total_passages"),
            F.sum(F.when(time_based_metrics.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99)).alias("total_revenue")
        )
        daily_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/daily_metrics')

        # Weekly metrics
        weekly_metrics = time_based_metrics.groupBy("year", "week_of_year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        weekly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/weekly_metrics')

        # Monthly metrics
        monthly_metrics = time_based_metrics.groupBy("year", "month").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        monthly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/monthly_metrics')

        # Quarterly metrics
        quarterly_metrics = time_based_metrics.groupBy("year", "quarter").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        quarterly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/quarterly_metrics')

        # Yearly metrics
        yearly_metrics = time_based_metrics.groupBy("year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        yearly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/yearly_metrics')

# Example usage
spark = SparkSession.builder.getOrCreate()
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")
warehouse_path = "s3a://warehouse/inventory/gold_data"
metrics_adapter = MetricsAdapter(silver_data, warehouse_path)
metrics_adapter.transform()

metrics_adapter.show_metrics()

+-------------------+-----+
|subscription_status|count|
+-------------------+-----+
|             active|    1|
+-------------------+-----+

Daily Metrics:
+----------+--------------+-------------+
|      date|total_passages|total_revenue|
+----------+--------------+-------------+
|2023-03-03|             1|         3.99|
+----------+--------------+-------------+

Weekly Metrics:
+----+------------+--------------+-------------+
|year|week_of_year|total_passages|total_revenue|
+----+------------+--------------+-------------+
|2023|           9|             1|         3.99|
+----+------------+--------------+-------------+

Monthly Metrics:
+----+-----+--------------+-------------+
|year|month|total_passages|total_revenue|
+----+-----+--------------+-------------+
|2023|    3|             1|         3.99|
+----+-----+--------------+-------------+

Quarterly Metrics:
+----+-------+--------------+-------------+
|year|quarter|total_passages|total_revenue|
+----+-------+--------------+-------