In [1]:
#Note that you only need execue this block once per session. Once you have the link to spark (sc has a value) 
#it causes and error if you repeat this

from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.appName("Python Spark SQL")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/02 11:10:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/02 11:10:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/02 11:10:38 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
#Check we have a spark context: Should show the spark version and app name
sc = spark.sparkContext
sc

In [3]:
!ls -l *.csv
!pwd

-rw-r--r-- 1 notebookuser notebookuser 30774793 Oct 28 14:58 DigitalBreathTestData2013.csv
/home/notebookuser/working/data


In [4]:
#Data is Crown Copyright published under Open Government Licence v3.0, 
#https://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/

#***Tears before bedtime!!!*** --- Triple check your file is REALLY on the pathname ***
#lines = sc.textFile("file:///home/student/izje1/DigitalBreathTestData2013.txt")
lines = sc.textFile("DigitalBreathTestData2013.csv")

In [5]:
header = lines.first() #extract header
justdata = lines.filter(lambda row: row != header)   #filter out header

                                                                                

In [6]:
#First split the file into columns, and just use the first column ('Reason') as the key
def parseline(line):
    fields = line.split(',')
    Reason = fields[0]
    return (Reason, 1)

In [7]:
# Now use all the data in the input file
#  NB if your breathtest file has a header line it may fail here. Copy the file and remove the header.
#
def parseline2(line):
    fields = line.split(',')
    Reason = fields[0]
    Month = fields[1]
    Year = fields[2]
    WeekType = fields[3]
    TimeBand = fields[4]
    BreathAlcoholLevel = int(fields[5])
    AgeBand = fields[6]
    if fields[7]=='Female':
        Gender = 2
    else:
        Gender = 1
    return (Reason,Month,Year,WeekType,TimeBand,BreathAlcoholLevel,AgeBand,Gender)

header = lines.first() #extract header
justdata = lines.filter(lambda row: row != header)
#Apply the parseline function to the lines -- evaluation is lazy
allreasons = justdata.map(parseline2)

In [8]:
#Count total lines in the file -- now must call evaluated so you get a noticeable delay
allreasons.count()

497790

In [9]:
allreasons.take(3)

[('Moving Traffic Violation',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  80,
  '30-39',
  1),
 ('Road Traffic Collision',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  0,
  'Other',
  1),
 ('Road Traffic Collision',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  96,
  'Other',
  1)]

In [10]:
#RDD version
#Count the different reason types -- just output the value. This should be formatted using print, printf etc
allreasons.countByKey()

defaultdict(int,
            {'Moving Traffic Violation': 179064,
             'Road Traffic Collision': 168526,
             'Suspicion of Alcohol': 94685,
             'Other': 55372,
             'Unknown': 143})

In [11]:
df = spark.createDataFrame(allreasons, header.split(','))

In [12]:
df.printSchema()

root
 |-- Reason: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- WeekType: string (nullable = true)
 |-- TimeBand: string (nullable = true)
 |-- BreathAlcoholLevel(microg 100ml): long (nullable = true)
 |-- AgeBand: string (nullable = true)
 |-- Gender: long (nullable = true)



In [13]:
df.count()

                                                                                

497790

In [14]:
df.registerTempTable("BreathData")



In [15]:
#Basic SQL Operation (RDDs are immutable so consider queries carefully). Note that we must collect() results from our query 
# or its execution is delayed.
distinct_reasons = spark.sql("select distinct Reason from BreathData")
for b in distinct_reasons.collect(): 
    print(b)

[Stage 9:>                                                          (0 + 2) / 2]

Row(Reason='Suspicion of Alcohol')
Row(Reason='Road Traffic Collision')
Row(Reason='Unknown')
Row(Reason='Other')
Row(Reason='Moving Traffic Violation')


                                                                                

In [16]:
df2 = df.withColumnRenamed('BreathAlcoholLevel(microg 100ml)', 'AlcoholLevel')

In [17]:
from pyspark.sql.functions import col

# Prepare 'male' and 'female' sub-dataframes 
# Your code set Gender=1 for Male, Gender=2 for Female
mAlltests = df2.filter(col("Gender") == 1)
fAlltests = df2.filter(col("Gender") == 2)

# Prepare positive (BreathAlcoholLevel > 35) subsets [cite: 25]
mPostests = mAlltests.filter(col("AlcoholLevel") > 35)
fPostests = fAlltests.filter(col("AlcoholLevel") > 35)

In [18]:
# Count the positive tests for each gender
male_positive_count = mPostests.count()
female_positive_count = fPostests.count()

print(f"Positive tests for Males: {male_positive_count}")
print(f"Positive tests for Females: {female_positive_count}")

[Stage 15:>                                                         (0 + 2) / 2]

Positive tests for Males: 43029
Positive tests for Females: 8459


                                                                                

In [19]:
# Calculate total tests per month from the main DataFrame
all_tests_monthly = df2.groupBy("Month").count().withColumnRenamed("count", "total_tests")

# Calculate positive tests per month
positive_tests = df2.filter(col("AlcoholLevel") > 35)
positive_tests_monthly = positive_tests.groupBy("Month").count().withColumnRenamed("count", "positive_tests")

# Join the two DataFrames 
proportions_df = all_tests_monthly.join(positive_tests_monthly, "Month")

# Add a new column to calculate the proportion
proportions_final_df = proportions_df.withColumn(
    "fail_proportion",
    (col("positive_tests") / col("total_tests"))
).orderBy(col("fail_proportion").desc())

print("Proportion of Failed Breath Tests by Month:")
# Convert to Pandas for a nicely formatted display, as suggested by the lab
proportions_final_df.toPandas()

Proportion of Failed Breath Tests by Month:


                                                                                

Unnamed: 0,Month,total_tests,positive_tests,fail_proportion
0,Aug,29303,4813,0.164249
1,Sep,25480,3951,0.155063
2,May,32617,4909,0.150504
3,Oct,27025,4045,0.149676
4,Apr,29663,4126,0.139096
5,Mar,31351,4357,0.138975
6,Nov,31469,4191,0.133179
7,Jul,34535,4575,0.132474
8,Feb,24982,3192,0.127772
9,Jan,26045,3033,0.116452


In [20]:
df2.schema

StructType(List(StructField(Reason,StringType,true),StructField(Month,StringType,true),StructField(Year,StringType,true),StructField(WeekType,StringType,true),StructField(TimeBand,StringType,true),StructField(AlcoholLevel,LongType,true),StructField(AgeBand,StringType,true),StructField(Gender,LongType,true)))

In [21]:
# Convert your Spark DataFrame to a Pandas DataFrame
print("Converting to Pandas DataFrame...")
# The %time command is used to measure how long the conversion takes
%time pandas_df = proportions_final_df.toPandas()
# Run a basic summary command
print("\nPandas DataFrame Summary:")
print(pandas_df.describe())

Converting to Pandas DataFrame...
CPU times: user 3.9 ms, sys: 896 Âµs, total: 4.8 ms
Wall time: 67.9 ms

Pandas DataFrame Summary:
         total_tests  positive_tests  fail_proportion
count      12.000000       12.000000        12.000000
mean    41482.500000     4290.666667         0.125956
std     29879.015721      688.409938         0.037102
min     24982.000000     3033.000000         0.042046
25%     26780.000000     4021.500000         0.124942
50%     30507.000000     4274.000000         0.136077
75%     33096.500000     4837.000000         0.149883
max    121914.000000     5170.000000         0.164249


In [22]:
# Group by 'Reason' and count occurrences
reason_counts = allreasons.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

# Sort by count in descending order
sorted_reason_counts = reason_counts.sortBy(lambda x: -x[1])

# Display the top 5 most common reasons
sorted_reason_counts.take(5)

                                                                                

[('Moving Traffic Violation', 179064),
 ('Road Traffic Collision', 168526),
 ('Suspicion of Alcohol', 94685),
 ('Other', 55372),
 ('Unknown', 143)]

In [23]:
from pyspark.sql.functions import desc

# For a FAILED test, what is the most common time of day and age?
print("\n--- Profile of a FAILED Test ---")
print("Most Common Time of Day for a FAILED Test:")
positive_tests.groupBy("TimeBand").count().orderBy(desc("count")).show(5, truncate=False)



--- Profile of a FAILED Test ---
Most Common Time of Day for a FAILED Test:


[Stage 42:>                                                         (0 + 2) / 2]

+--------+-----+
|TimeBand|count|
+--------+-----+
|12am-4am|17655|
|8pm-12pm|12878|
|4pm-8pm |6729 |
|4am-8am |5964 |
|8am-12pm|4292 |
+--------+-----+
only showing top 5 rows



                                                                                

In [24]:
print("Most Common Age Group for a FAILED Test:")
positive_tests.groupBy("AgeBand").count().orderBy(desc("count")).show(5, truncate=False)

Most Common Age Group for a FAILED Test:


[Stage 45:>                                                         (0 + 2) / 2]

+-------+-----+
|AgeBand|count|
+-------+-----+
|30-39  |10957|
|40-49  |8442 |
|20-24  |8000 |
|25-29  |7224 |
|Other  |7149 |
+-------+-----+
only showing top 5 rows



                                                                                

#frpm to the bottom continues the normal exercise for wek 4

In [25]:
df3 = df2.select('*').filter(df2.AlcoholLevel > 35).orderBy(df2.AlcoholLevel.desc())

In [26]:
df3.show(10)

                                                                                

+--------------------+-----+----+--------+--------+------------+-------+------+
|              Reason|Month|Year|WeekType|TimeBand|AlcoholLevel|AgeBand|Gender|
+--------------------+-----+----+--------+--------+------------+-------+------+
|Moving Traffic Vi...|  Mar|2013| Weekday| 4am-8am|         745|  40-49|     1|
|Road Traffic Coll...|  May|2013| Weekend| 4am-8am|         676|  16-19|     1|
|Suspicion of Alcohol|  Jun|2013| Weekday|12am-4am|         667|  20-24|     1|
|Road Traffic Coll...|  Nov|2013| Weekday|12am-4am|         565|  70-98|     1|
|Suspicion of Alcohol|  Apr|2013| Weekend| 4am-8am|         562|  20-24|     1|
|Road Traffic Coll...|  Oct|2013| Weekday|8am-12pm|         538|  25-29|     1|
|Road Traffic Coll...|  Nov|2013| Weekend|12am-4am|         531|  Other|     1|
|Suspicion of Alcohol|  Apr|2013| Weekday|12am-4am|         527|  30-39|     2|
|Suspicion of Alcohol|  Mar|2013| Weekday|8pm-12pm|         518|  50-59|     1|
|Suspicion of Alcohol|  Apr|2013| Weekda

In [27]:
from pyspark.sql.types import *

myschema2 = StructType(
    [StructField("Reason",StringType(),True),
     StructField("Month",StringType(),True),
     StructField("Year",IntegerType(),True),
     StructField("WeekType",StringType(),True),
     StructField("TimeBand",StringType(),True),
     StructField("AlcoholLevel",IntegerType(),True),
     StructField("AgeBand",StringType(),True),
     StructField("Gender",StringType(),True)])

In [28]:
df4 = spark.read.load("DigitalBreathTestData2013.csv",
                      format="csv", sep=",", schema=myschema2, 
                      header="true")\
                    .withColumnRenamed('BreathAlcoholLevel(microg 100ml)', 'AlcoholLevel')

In [29]:
df4.show(10)

+--------------------+-----+----+--------+--------+------------+-------+------+
|              Reason|Month|Year|WeekType|TimeBand|AlcoholLevel|AgeBand|Gender|
+--------------------+-----+----+--------+--------+------------+-------+------+
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|          80|  30-39|  Male|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|           0|  Other|  Male|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|          96|  Other|  Male|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|           0|  40-49|Female|
|Suspicion of Alcohol|  Jan|2013| Weekday|12am-4am|           0|  40-49|  Male|
|Road Traffic Coll...|  Jan|2013| Weekday|8am-12pm|          45|  Other|  Male|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|          60|  30-39|  Male|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|           0|  16-19|  Male|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|           0|  16-19|  Male|
|Moving Traffic Vi...|  Jan|2013| Weekda

25/11/02 11:11:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Reason, Month, Year, WeekType, TimeBand, BreathAlcoholLevel(microg 100ml), AgeBand, Gender
 Schema: Reason, Month, Year, WeekType, TimeBand, AlcoholLevel, AgeBand, Gender
Expected: AlcoholLevel but found: BreathAlcoholLevel(microg 100ml)
CSV file: file:///home/notebookuser/working/data/DigitalBreathTestData2013.csv


In [30]:
df4.schema

StructType(List(StructField(Reason,StringType,true),StructField(Month,StringType,true),StructField(Year,IntegerType,true),StructField(WeekType,StringType,true),StructField(TimeBand,StringType,true),StructField(AlcoholLevel,IntegerType,true),StructField(AgeBand,StringType,true),StructField(Gender,StringType,true)))