In [1]:
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Python Spark SQL")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()

sc = spark.sparkContext

In [None]:
!ls -l ../Data
!pwd

In [2]:
lines=sc.textFile("DigitalBreathTestData2013.txt")

lines.collect()

['Reason,Month,Year,WeekType,TimeBand,BreathAlcoholLevel(microg 100ml),AgeBand,Gender',
 'Moving Traffic Violation,Jan,2013,Weekday,12am-4am,80,30-39,Male',
 'Road Traffic Collision,Jan,2013,Weekday,12am-4am,0,Other,Male',
 'Road Traffic Collision,Jan,2013,Weekday,12am-4am,96,Other,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,40-49,Female',
 'Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,0,40-49,Male',
 'Road Traffic Collision,Jan,2013,Weekday,8am-12pm,45,Other,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,12am-4am,60,30-39,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,16-19,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,16-19,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,8am-12pm,0,50-59,Male',
 'Moving Traffic Violation,Jan,2013,Weekday,8pm-12pm,0,30-39,Male',
 'Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,110,30-39,Male',
 'Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,46,16-19,Male',
 'Road Traffic Collision,Jan,2013,Wee

In [3]:
header = lines.first()
justdata = lines.filter(lambda row: row != header) 

In [7]:
def parseline(line):
    fields = line.split(',')
    Reason = fields[0]
    return (Reason, 1)

In [5]:
#Apply the parseline function to lines
allreasons = lines.map(parseline)

In [6]:
allreasons.count()

497791

In [8]:
#Count the different reason types -- just output the value.
allreasons.countByKey()

defaultdict(int,
            {'Reason': 1,
             'Moving Traffic Violation': 179064,
             'Road Traffic Collision': 168526,
             'Suspicion of Alcohol': 94685,
             'Other': 55372,
             'Unknown': 143})

In [10]:
def parseline2(line):
    fields = line.split(',')
    Reason = fields[0]
    Month = fields[1]
    Year = fields[2]
    WeekType = fields[3]
    TimeBand = fields[4]
    BreathAlcoholLevel = int(fields[5])
    AgeBand = fields[6]
    if fields[7]=='Female':
        Gender = 2
    else:
        Gender = 1
    return (Reason,Month,Year,WeekType,TimeBand,BreathAlcoholLevel,AgeBand,Gender)

In [11]:
header = lines.first() #extract header
justdata = lines.filter(lambda row: row != header)   #filter out header
rdd = justdata.map(parseline2) # split lines into colimnd

rdd.count()

497790

In [13]:
df = spark.createDataFrame(rdd, header.split(','))

In [14]:
df.show()

+--------------------+-----+----+--------+--------+--------------------------------+-------+------+
|              Reason|Month|Year|WeekType|TimeBand|BreathAlcoholLevel(microg 100ml)|AgeBand|Gender|
+--------------------+-----+----+--------+--------+--------------------------------+-------+------+
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                              80|  30-39|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                               0|  Other|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                              96|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                               0|  40-49|     2|
|Suspicion of Alcohol|  Jan|2013| Weekday|12am-4am|                               0|  40-49|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|8am-12pm|                              45|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                              60|  30-39|     1|


In [15]:
df.printSchema()

root
 |-- Reason: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- WeekType: string (nullable = true)
 |-- TimeBand: string (nullable = true)
 |-- BreathAlcoholLevel(microg 100ml): long (nullable = true)
 |-- AgeBand: string (nullable = true)
 |-- Gender: long (nullable = true)



In [16]:
from pyspark.sql.types import *
myschema = StructType(
    [StructField("Reason",StringType(),True),
     StructField("Month",StringType(),True),
     StructField("Year",IntegerType(),True),
     StructField("WeekType",StringType(),True),
     StructField("TimeBand",StringType(),True),
     StructField("AlcoholLevel",IntegerType(),True),
     StructField("AgeBand",StringType(),True),
     StructField("Gender",StringType(),True)])

In [17]:
df.show()

+--------------------+-----+----+--------+--------+--------------------------------+-------+------+
|              Reason|Month|Year|WeekType|TimeBand|BreathAlcoholLevel(microg 100ml)|AgeBand|Gender|
+--------------------+-----+----+--------+--------+--------------------------------+-------+------+
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                              80|  30-39|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                               0|  Other|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|12am-4am|                              96|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                               0|  40-49|     2|
|Suspicion of Alcohol|  Jan|2013| Weekday|12am-4am|                               0|  40-49|     1|
|Road Traffic Coll...|  Jan|2013| Weekday|8am-12pm|                              45|  Other|     1|
|Moving Traffic Vi...|  Jan|2013| Weekday|12am-4am|                              60|  30-39|     1|


In [19]:
df.schema

StructType([StructField('Reason', StringType(), True), StructField('Month', StringType(), True), StructField('Year', StringType(), True), StructField('WeekType', StringType(), True), StructField('TimeBand', StringType(), True), StructField('BreathAlcoholLevel(microg 100ml)', LongType(), True), StructField('AgeBand', StringType(), True), StructField('Gender', LongType(), True)])