# Spark Mastery



In [12]:
%load_ext autoreload
%autoreload 2

In [13]:
# make sure your run the cell above before running this
import helper

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .getOrCreate()

sc = spark.sparkContext

In [29]:
import csv
import random
from datetime import datetime, timedelta

# Define the number of lines and the headers
num_lines = 500000
headers = ['jobId', 'sendingTeam', 'status', 'startDate', 'endDate']

# Status options for the 'status' field
status_options = ['Pending', 'In Progress', 'Completed', 'Failed']
sending_team_options = ['MindOvermatter', 'JuiceBox', 'AndersonSquad', 'Distributo']

# Function to generate a random date after 2000
def random_date():
    start_date = datetime(2000, 1, 1)
    end_date = datetime.now()
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days
    random_number_of_days = random.randrange(days_between_dates)
    return start_date + timedelta(days=random_number_of_days)

# Creating and writing to the CSV file
with open('jobs_data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    
    # Writing the header
    writer.writerow(headers)
    
    # Writing the data rows
    for i in range(1, num_lines + 1):
        # Generate a random status
        status = random.choice(status_options)
        sending_team = random.choice(sending_team_options)
        

        # Generate a random start date
        start_date = random_date().strftime('%Y-%m-%d')

        # Generate an end date for 'Completed' or 'Failed' jobs
        end_date = ''
        if status in ['Completed', 'Failed']:
            end_date = random_date().strftime('%Y-%m-%d')

        # Write the row
        writer.writerow([f"Job{i}", sending_team, status, start_date, end_date])

# Inform the user that the file has been created
print('CSV file created with 500,000 lines.')


CSV file created with 500,000 lines.


In [33]:
# Spark Approach

In [34]:
df = spark.read.csv("jobs_data.csv", header=True, inferSchema=True)
print(df.head)
print(df.columns)
df.show()



<bound method DataFrame.head of DataFrame[jobId: string, sendingTeam: string, status: string, startDate: date, endDate: date]>
['jobId', 'sendingTeam', 'status', 'startDate', 'endDate']
+-----+--------------+-----------+----------+----------+
|jobId|   sendingTeam|     status| startDate|   endDate|
+-----+--------------+-----------+----------+----------+
| Job1|    Distributo|    Pending|2001-09-22|      NULL|
| Job2|      JuiceBox|    Pending|2007-05-23|      NULL|
| Job3|MindOvermatter|    Pending|2018-08-27|      NULL|
| Job4|    Distributo|  Completed|2020-05-21|2014-01-05|
| Job5|MindOvermatter|  Completed|2005-12-30|2013-02-11|
| Job6| AndersonSquad|     Failed|2007-07-23|2015-12-02|
| Job7|      JuiceBox|     Failed|2019-01-03|2003-12-07|
| Job8|    Distributo|    Pending|2019-12-10|      NULL|
| Job9|    Distributo|     Failed|2009-09-22|2015-11-17|
|Job10| AndersonSquad|     Failed|2000-04-10|2014-09-15|
|Job11|    Distributo|  Completed|2020-10-16|2018-10-27|
|Job12| Anderson

                                                                                

In [35]:
# retrieve total statuses
helper.get_status_counts(df)

Status: Completed, Count: 125186
Status: In Progress, Count: 124948
Status: Failed, Count: 124721
Status: Pending, Count: 125145


In [36]:
helper.count_pending_in_progress_jobs(df)

250093

In [37]:
new_df = helper.retry_failed_jobs(helper.get_failed_jobs(df))
new_df.show()

+-----+--------------+-------+----------+----------+
|jobId|   sendingTeam| status| startDate|   endDate|
+-----+--------------+-------+----------+----------+
| Job6| AndersonSquad|Pending|2007-07-23|2015-12-02|
| Job7|      JuiceBox|Pending|2019-01-03|2003-12-07|
| Job9|    Distributo|Pending|2009-09-22|2015-11-17|
|Job10| AndersonSquad|Pending|2000-04-10|2014-09-15|
|Job12| AndersonSquad|Pending|2017-08-21|2017-12-20|
|Job13|MindOvermatter|Pending|2002-05-18|2014-12-23|
|Job18|    Distributo|Pending|2002-06-12|2017-05-17|
|Job27|MindOvermatter|Pending|2011-04-09|2020-12-24|
|Job37| AndersonSquad|Pending|2020-12-12|2010-09-16|
|Job42|      JuiceBox|Pending|2003-01-25|2000-06-14|
|Job46|MindOvermatter|Pending|2007-12-07|2007-11-22|
|Job52|    Distributo|Pending|2015-06-16|2003-07-17|
|Job54|MindOvermatter|Pending|2014-12-01|2006-03-22|
|Job55| AndersonSquad|Pending|2001-06-29|2014-08-14|
|Job56|      JuiceBox|Pending|2021-01-18|2021-01-02|
|Job57|      JuiceBox|Pending|2017-10-13|2007-

In [38]:
team_counts = helper.count_jobs_by_team(df)
team_counts.show()

+--------------+------+
|   sendingTeam| count|
+--------------+------+
|MindOvermatter|124579|
| AndersonSquad|125375|
|    Distributo|125272|
|      JuiceBox|124774|
+--------------+------+



In [39]:
average_durations = helper.average_duration_by_status(df)
average_durations.show()



+-----------+-----------------+
|     status|    avg(duration)|
+-----------+-----------------+
|  Completed|9.948133177831387|
|In Progress|             NULL|
|     Failed|5.749841646555111|
|    Pending|             NULL|
+-----------+-----------------+



                                                                                

In [40]:
completed_jobs = helper.jobs_completed_in_range(df, "2020-01-01", "2021-01-01")
completed_jobs.show()

+-------+--------------+---------+----------+----------+
|  jobId|   sendingTeam|   status| startDate|   endDate|
+-------+--------------+---------+----------+----------+
|  Job16|    Distributo|Completed|2021-08-14|2020-04-21|
| Job307|MindOvermatter|Completed|2015-07-18|2020-08-13|
| Job386|      JuiceBox|Completed|2002-11-02|2020-08-26|
| Job398|    Distributo|Completed|2008-06-06|2020-03-29|
| Job400|      JuiceBox|Completed|2015-01-15|2020-05-21|
| Job490| AndersonSquad|Completed|2003-07-19|2020-02-08|
| Job515| AndersonSquad|Completed|2011-04-09|2020-05-12|
| Job518|      JuiceBox|Completed|2022-05-01|2020-11-05|
| Job559| AndersonSquad|Completed|2008-10-03|2020-06-19|
| Job601|    Distributo|Completed|2023-09-13|2020-04-20|
| Job897| AndersonSquad|Completed|2007-11-28|2020-08-11|
|Job1108| AndersonSquad|Completed|2022-07-09|2020-04-23|
|Job1134|      JuiceBox|Completed|2009-03-07|2020-10-19|
|Job1340|    Distributo|Completed|2007-05-13|2020-06-01|
|Job1456| AndersonSquad|Complet

In [41]:
status_distribution = helper.status_distribution_by_team(df)
status_distribution.show()

+--------------+-----------+-----+
|   sendingTeam|     status|count|
+--------------+-----------+-----+
| AndersonSquad|    Pending|31415|
|    Distributo|     Failed|31369|
| AndersonSquad|  Completed|31359|
|MindOvermatter|    Pending|31229|
|MindOvermatter|In Progress|31308|
| AndersonSquad|     Failed|31074|
|    Distributo|    Pending|31268|
|    Distributo|  Completed|31484|
|      JuiceBox|  Completed|31440|
|MindOvermatter|  Completed|30903|
|      JuiceBox|In Progress|30962|
|MindOvermatter|     Failed|31139|
|      JuiceBox|     Failed|31139|
|    Distributo|In Progress|31151|
| AndersonSquad|In Progress|31527|
|      JuiceBox|    Pending|31233|
+--------------+-----------+-----+



                                                                                