In [2]:
from pyspark import SparkContext, SparkConf 
sc = SparkContext.getOrCreate()
sc

In [3]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

In [5]:
#*** --- ensure the correct path this step is critical ***
lines = sc.textFile("data/DigitalBreathTestData2013.csv")

In [6]:
# Now use all the data in the input file
#
def parseline2(line):
    fields = line.split(',')
    Reason = fields[0]
    Month = fields[1]
    Year = fields[2]
    WeekType = fields[3]
    TimeBand = fields[4]
    BreathAlcoholLevel = int(fields[5])
    AgeBand = fields[6]
    if fields[7]=='Female':
        Gender = 2
    else:
        Gender = 1
    return (Reason,Month,Year,WeekType,TimeBand,BreathAlcoholLevel,AgeBand,Gender)


#The CSV file includes a text giveing the column headings. We can either remove that first line,
# or put error checking around the int() in parseline2 (below). Here we apply a filter
header = lines.first() #extract header
justdata = lines.filter(lambda row: row != header)   #filter out header
rdd = justdata.map(parseline2) # split lines into colimnd

In [7]:
# force a reduce operatio excellent to make errors appear that may have been delayed
print(rdd.count())


497790


In [8]:
# showing the first 3 results as counting might take a while.
#Here we just return a few rows. This is just for debugging
rdd.take(3)

[('Moving Traffic Violation',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  80,
  '30-39',
  1),
 ('Road Traffic Collision',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  0,
  'Other',
  1),
 ('Road Traffic Collision',
  'Jan',
  '2013',
  'Weekday',
  '12am-4am',
  96,
  'Other',
  1)]

In [9]:
#Each of the Dataframe headrs corresponds to a column name
df = sqlCtx.createDataFrame(rdd, ['Reason','Month','Year','WeekType','TimeBand','BreathAlcoholLevel','AgeBand','Gender'])

In [10]:
#look at the dataframe.
df.show()

+--------------------+-----+----+--------+--------+------------------+-------+------+
|              Reason|Month|Year|WeekType|TimeBand|BreathAlcoholLevel|AgeBand|Gender|
+--------------------+-----+----+--------+--------+------------------+-------+------+
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                80|  30-39|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                 0|  Other|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                96|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                 0|  40-49|     2|
|Suspicion of Alcohol|  Jan|2013| Weekday|12am-4am|                 0|  40-49|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|8am-12pm|                45|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                60|  30-39|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                 0|  16-19|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|   

In [11]:
#The dataframe is also an RDD.
df.count()

497790

In [12]:
df.printSchema()

root
 |-- Reason: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- WeekType: string (nullable = true)
 |-- TimeBand: string (nullable = true)
 |-- BreathAlcoholLevel: long (nullable = true)
 |-- AgeBand: string (nullable = true)
 |-- Gender: long (nullable = true)



In [13]:
# Make the table name available to select using SQL queries. 
df.registerTempTable("BreathData")

In [14]:
#Basic SQL Operation (RDDs are immutable so consider queries carefully). Note that we must collect() results from our query 
# or its execution is delayed.
distinct_reasons = sqlCtx.sql("select distinct Reason from BreathData")
for b in distinct_reasons.collect(): 
    print(b)

Row(Reason='Suspicion of Alcohol')
Row(Reason='Road Traffic Collision')
Row(Reason='Unknown')
Row(Reason='Other')
Row(Reason='Moving Traffic Violation')


In [15]:
# here is a slightly more complex SQL query
results = sqlCtx.sql("SELECT Reason, COUNT(*) AS cnt FROM BreathData WHERE BreathAlcoholLevel >35 GROUP BY Reason ORDER BY cnt DESC LIMIT 10").collect()

In [16]:
df_results = sqlCtx.createDataFrame(results)

In [17]:
df_results.show()

+--------------------+-----+
|              Reason|  cnt|
+--------------------+-----+
|Suspicion of Alcohol|16328|
|Road Traffic Coll...|16325|
|Moving Traffic Vi...|11038|
|               Other| 7789|
|             Unknown|    8|
+--------------------+-----+



# Data preparation

In [18]:
from pyspark.sql.functions import col

# Prepare 'male' and 'female' sub-dataframes 
mAlltests = df.filter(col("Gender") == 1)
fAlltests = df.filter(col("Gender") == 2)

# Prepare positive (BreathAlcoholLevel > 35) subsets [cite: 25]
mPostests = mAlltests.filter(col("BreathAlcoholLevel") > 35)
fPostests = fAlltests.filter(col("BreathAlcoholLevel") > 35)

# Male and female: Initial investigation

In [19]:
# Count the positive tests for each gender
male_positive_count = mPostests.count()
female_positive_count = fPostests.count()

print(f"Positive tests for Males: {male_positive_count}")
print(f"Positive tests for Females: {female_positive_count}")

Positive tests for Males: 43029
Positive tests for Females: 8459


# Consider month of the year

In [20]:
# Calculate total tests per month from the main DataFrame
all_tests_monthly = df.groupBy("Month").count().withColumnRenamed("count", "total_tests")

# Calculate positive tests per month
positive_tests = df.filter(col("BreathAlcoholLevel") > 35)
positive_tests_monthly = positive_tests.groupBy("Month").count().withColumnRenamed("count", "positive_tests")

# Join the two DataFrames 
proportions_df = all_tests_monthly.join(positive_tests_monthly, "Month")

# Add a new column to calculate the proportion
proportions_final_df = proportions_df.withColumn(
    "fail_proportion",
    (col("positive_tests") / col("total_tests"))
).orderBy(col("fail_proportion").desc())

print("Proportion of Failed Breath Tests by Month:")

# Convert to Pandas for a nicely formatted display, as suggested by the lab
proportions_final_df.toPandas()

Proportion of Failed Breath Tests by Month:


Unnamed: 0,Month,total_tests,positive_tests,fail_proportion
0,Aug,29303,4813,0.164249
1,Sep,25480,3951,0.155063
2,May,32617,4909,0.150504
3,Oct,27025,4045,0.149676
4,Apr,29663,4126,0.139096
5,Mar,31351,4357,0.138975
6,Nov,31469,4191,0.133179
7,Jul,34535,4575,0.132474
8,Feb,24982,3192,0.127772
9,Jan,26045,3033,0.116452


In [21]:
# Convert your Spark DataFrame to a Pandas DataFrame
print("Converting to Pandas DataFrame...")
# The %time command is used to measure how long the conversion takes
%time pandas_df = proportions_final_df.toPandas()
# Run a basic summary command
print("\nPandas DataFrame Summary:")
print(pandas_df.describe())


Converting to Pandas DataFrame...
CPU times: user 9.19 ms, sys: 2.34 ms, total: 11.5 ms
Wall time: 131 ms

Pandas DataFrame Summary:
         total_tests  positive_tests  fail_proportion
count      12.000000       12.000000        12.000000
mean    41482.500000     4290.666667         0.125956
std     29879.015721      688.409938         0.037102
min     24982.000000     3033.000000         0.042046
25%     26780.000000     4021.500000         0.124942
50%     30507.000000     4274.000000         0.136077
75%     33096.500000     4837.000000         0.149883
max    121914.000000     5170.000000         0.164249


# Advanced Analysis

### 1 Of the five reasons, what is the most likely reason to be asked to take a breath test? 

In [22]:
# Group by 'Reason' and count occurrences
reason_counts = rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

# Sort by count in descending order
sorted_reason_counts = reason_counts.sortBy(lambda x: -x[1])

# Display the top 5 most common reasons
sorted_reason_counts.take(5)

[('Moving Traffic Violation', 179064),
 ('Road Traffic Collision', 168526),
 ('Suspicion of Alcohol', 94685),
 ('Other', 55372),
 ('Unknown', 143)]

#### 2	For a failed breath test (in other words, BreathAlcoholLevel exceeds 35 microgrammes per 100 millilitres of breath) what is the most common time of day, age, etc.)?   


In [23]:
from pyspark.sql.functions import desc

# For a FAILED test, what is the most common time of day and age?
print("\n--- Profile of a FAILED Test ---")
print("Most Common Time of Day for a FAILED Test:")
positive_tests.groupBy("TimeBand").count().orderBy(desc("count")).show(5, truncate=False)




--- Profile of a FAILED Test ---
Most Common Time of Day for a FAILED Test:
+--------+-----+
|TimeBand|count|
+--------+-----+
|12am-4am|17655|
|8pm-12pm|12878|
|4pm-8pm |6729 |
|4am-8am |5964 |
|8am-12pm|4292 |
+--------+-----+
only showing top 5 rows



#### As you repeat these essentially repetitive tasks, I used Pyspark sql methods, rather than sql strings. 

In [25]:
print("Most Common Age Group for a FAILED Test:")
positive_tests.groupBy("AgeBand").count().orderBy(desc("count")).show(5, truncate=False)

Most Common Age Group for a FAILED Test:
+-------+-----+
|AgeBand|count|
+-------+-----+
|30-39  |10957|
|40-49  |8442 |
|20-24  |8000 |
|25-29  |7224 |
|Other  |7149 |
+-------+-----+
only showing top 5 rows
