<a href="https://colab.research.google.com/github/Austinxdsouza/2024-Python_SQL_projets/blob/main/Python%26PySparkApp_Outsteer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=a2028a8ba4c84d136f14c7faccda3783c009b3060ea0435877ce0cda505cad85
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


**TASK ONE**

Q1 (Python)

Write a Python script to read a large text file (containing millions of rows) and filter out all the
lines that contain the value 'N/A'.
Then, write the filtered data to a new file.
Make sure your script can

- handle the large size of the input file without running out of memory.

My Solution =
1. Made use of Spark instead of Pandas so that the Workload is more distributed across its Nodes.
2. Made use of show() instead of collect to reduce load on the Driver Node.
3. Loaded the data into a seperate directory in Snappy-Parquet format which is an efficient format for storing Analytical/DW/Queriable Data.
4. Also Stopped Spark engine in the end to save costs.

- minimize i/o operations

This will be like running grep input.txt |grep -v 'N/A' > output.txt

My Solution=
1. I have made use of Sparks Filter method which is distributed in nature while performing transformations

filtered_na_df = df.filter(df["Name"].isNull())


In [8]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("App").getOrCreate()

input = "/content/drive/MyDrive/Colab Notebooks/PythonApp_Outsteer/healthcare_dataset.csv"
output = "/content/drive/MyDrive/Colab Notebooks/PythonApp_Outsteer/output/filtered_na_lines_df"

df = spark.read.option("header","true").csv(input)

# Filter out lines that contain 'N/A'
filtered_na_df = df.filter(df.Name.isNull() & df.Doctor.isNull()) # Just like this we can add all the relavant columns

# Save the filtered data to a new file
filtered_na_df.write.parquet(output)  #delta is slightly better than parquet in production environments because it has elements like Time Travel and a Transaction Layer Added for Transactional Efficiency


# # Stop the Spark session
# spark.stop()

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/content/drive/MyDrive/Colab Notebooks/PythonApp_Outsteer/output/filtered_na_lines_df already exists. Set mode as "overwrite" to overwrite the existing path.

**TASK TWO**
Q2 (Pyspark)

We have data stored in Azure Blob Storage partitioned by tenant name and short_date (YYYY-
MM-DD format). There are two existing tables with this data:

• **Events** – pulled daily by event_time (Columns: event_id, event_time, common_data_1, common_data_2)

• **Updates** – pulled daily by last_modified (Columns: event_id, event_time, last_modified, common_data_1, specific_data_1)

We want to create a new table called **updated_events** that combines the relevant information
from both tables. This new table should include the following columns:

1. event_id (key)
2. event_time (key)
3. last_modified
4. common_data_1
5. common_data_2
6. specific_data_1

Important keynotes:
1. The updated_events table should reflect the **latest update** for that event_id and event_time.
2. Pay attention to **performance** of the spark job.
3. The spark job should **run daily** and fill updated_events with missing event/updates.

Walk us through the process of creating the updated_events table using pyspark, considering the **data partitioning, update logic and backfill strategy.**


In [60]:
import pyspark.sql.types
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrameWriter #for partitionBy()


#Define Schema Manually
eventsSchema = StructType([
            StructField('event_id',IntegerType(),True),
            StructField('event_time', IntegerType(),True),  #To speed up the process I used IntegerType but I understand logically it should be TimestampType().
            StructField('common_data_1',StringType(),True),
            StructField('common_data_2',StringType(),True)])

updatesSchema = StructType([
            StructField('event_id',IntegerType(),True),
            StructField('event_time',IntegerType(),True),
            StructField('last_modified',StringType(),True),
            StructField('common_data_1',StringType(),True),
            StructField('specific_data_1',StringType(),True)
            ])

# #Entering data manually

EventsData = [Row(1000,1050, "James", "abc"),
              Row(1001,1350, "Jack", "abc"),
              Row(1002,1750, "John", "anf")]

UpdatesData = [Row(1000,1050,2000, "James1", "Mathew"),
              Row(1001,2300,2000, "James1", "Mathew")]


EDf = spark.createDataFrame(EventsData,eventsSchema)  #Events_DataFrame
EDf.createOrReplaceTempView("EventsDf") #Optional
UDf = spark.createDataFrame(UpdatesData,updatesSchema)  #Updates_DataFrame
UDf.createOrReplaceTempView("UpdatesDf")  #Optional

  # USING PYSPARK

Updated_EventsDF = EDf.join(UDf).where((EDf["event_id"] == UDf["event_id"]) &
                  (EDf["event_time"] == UDf["event_time"]))
# Join the two tables "EventsData" & "Updates Data "to form "Updated_Events Data"

Updated_EventsDF.write.partitionBy("common_data_1").delta("#Desired Deltalake Location")
#PartitionBy () using common_data_1 cause it seems most probably CATEGORICAL DATA and not RECURCIVE or CONTINUOUS

""" WORKFLOW MANAGEMENT INFORMATION

In order to run the notebook daily we have two options -

1. Azure Data Factory as an Orchestration Tool:
    Create an ETL Pipeline, Add the Notebook (as ADF Databricks Activity) that produces the Combined Results in a desired Fashion,
    Set the Schedule Trigger frequency say Daily at 07:00.

2. Databricks Notebook Workflows:

  Databricks Jobs: Create New Job> ScheduleType Schedule>EveryDay at 07:00 Hrs
      Select Task Type> Notebook, NotebookLocation>.., Cluster>SingleNodeLatestSparkVersion,
      Set parameters(), Can set TimeOut/Retries/Alerts

"""



+--------+----------+-------------+-------------+--------+----------+-------------+-------------+---------------+
|event_id|event_time|common_data_1|common_data_2|event_id|event_time|last_modified|common_data_1|specific_data_1|
+--------+----------+-------------+-------------+--------+----------+-------------+-------------+---------------+
|    1000|      1050|        James|          abc|    1000|      1050|         2000|       James1|         Mathew|
+--------+----------+-------------+-------------+--------+----------+-------------+-------------+---------------+



' WORKFLOW MANAGEMENT \n\nIn order to run the notebook daily we have two options - \n\n1. Azure Data Factory as an Orchestration Tool:\n    Create an ETL Pipeline, Add the Notebook (as ADF Databricks Activity) that produces the Combined Results in a desired Fashion, \n    Set the Schedule Trigger frequency say Daily at 07:00.\n\n2. Databricks Notebook Workflows:\n  \n  Databricks Jobs: Create New Job> ScheduleType Schedule>EveryDay at 07:00 Hrs \n      Select Task Type> Notebook, NotebookLocation>.., Cluster>SingleNodeLatestSparkVersion, \n      Set parameters(), Can set TimeOut/Retries/Alerts\n\n'

** ROUGH NOTES**

In [None]:
"""CREATE TABLE Events (
  event_id INT,
  event_time TIME,
  common_data_1 VARCHAR(255),
  common_data_2 VARCHAR(255)
  );

CREATE TABLE Updates (
  event_id INT,
  event_time TIME,
  last_modified VARCHAR(255),
  common_data_1 VARCHAR(255),
  specific_data_1 VARCHAR(255)
  );

--INSERT INTO Events(event_id, event_time,common_data_1,common_data_2) VALUES (10,CURRENT_TIMESTAMP,'ABC','EIF');
--INSERT INTO Events(event_id, event_time,common_data_1,common_data_2) VALUES (11,CURRENT_TIMESTAMP,'ABE','EIK');

INSERT INTO UPDATES(event_id, event_time,last_modified,common_data_1,specific_data_1) VALUES (11,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP-1,'ABC','XDX');

SELECT * FROM UPDATES

# MANUALLY DEFINING SCHEMA USING PYSPARK STRUCTTYPE

import pyspark.sql.types
from pyspark.sql.types import *

eventsSchema = StructType([
            StructField('event_id',IntegerType(),True),
            StructField('event_time', TimestampType(),True),
            StructField('common_data_1',StringType(),True),
            StructField('common_data_2',StringType(),True)])

updatesSchema = StructType([
            StructField('event_id',IntegerType(),True),
            StructField('event_time',TimestampType(),True),
            StructField('last_modified',StringType(),True),
            StructField('common_data_1',StringType(),True),
            StructField('specific_data_1',StringType(),True)
            ])

REF

https://medium.com/@uzzaman.ahmed/pyspark-date-time-functions-a-comprehensive-guide-b250e92df264
https://www.chaosgenius.io/blog/databricks-data-types/

https://sparkbyexamples.com/pyspark/pyspark-join-multiple-columns/

https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/  Partitioning Data
"""

