In [1]:
# PySpark API:
#http://spark.apache.org/docs/latest/api/python/index.html

import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# define SQLContext
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("appName2")
sc = SparkContext(conf=conf)

# initialise sc
#from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)


# Import libraries and other functions
from io import StringIO
from collections import namedtuple
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

import csv
# import python library for plotting data
import matplotlib.pyplot as plt
import gmplot
import pandas as pd


In [2]:
#picking up the transportation data from the data path
#data available from https://data.world/chhs/b1008bb6-2f54-4b49-a0e0-c4d1bc9440ff
calitran=sc.textFile ("/user/imat5322_user119423/transportation california.csv/")

In [3]:
#using a spark action to test if data was imported
#count the the total count of the data
calitran.count()

202204

In [4]:
#view the first five rows
calitran.take(5)

['ind_id,ind_definition,reportyear,race_eth_code,race_eth_name,geotype,geotypevalue,geoname,county_name,county_fips,region_name,region_code,mode,mode_name,pop_total,pop_mode,percent,LL95CI_percent,UL95CI_percent,percent_se,percent_rse,CA_decile,CA_RR,version',
 '42,Percent of residents mode of transportation to work,2000,1,AIAN,CA,06,California,,,,,ATHOME,Worked at home,73560,2414,3.281674823,,,,,,0.855732547,12/12/2013',
 '42,Percent of residents mode of transportation to work,2000,2,Asian,CA,06,California,,,,,ATHOME,Worked at home,1634923,42902,2.624099117,,,,,,0.684262501,12/12/2013',
 '42,Percent of residents mode of transportation to work,2000,3,AfricanAm,CA,06,California,,,,,ATHOME,Worked at home,797818,21348,2.67579824,,,,,,0.697743612,12/12/2013',
 '42,Percent of residents mode of transportation to work,2000,4,Latino,CA,06,California,,,,,ATHOME,Worked at home,3865125,75246,1.946793441,,,,,,0.507647649,12/12/2013']

In [5]:
#viewing the header of the data
calitran.first()

'ind_id,ind_definition,reportyear,race_eth_code,race_eth_name,geotype,geotypevalue,geoname,county_name,county_fips,region_name,region_code,mode,mode_name,pop_total,pop_mode,percent,LL95CI_percent,UL95CI_percent,percent_se,percent_rse,CA_decile,CA_RR,version'

In [6]:
#defining the header of the data
calitranheader= calitran.first()

In [7]:
#filtering out the header so it contains data only 
calitrandataonly = calitran.filter(lambda x:x!=calitranheader)

In [8]:
#viewing the first row of the filtered data which doesn't contain the header
calitrandataonly.first()

'42,Percent of residents mode of transportation to work,2000,1,AIAN,CA,06,California,,,,,ATHOME,Worked at home,73560,2414,3.281674823,,,,,,0.855732547,12/12/2013'

In [9]:
#finding any zero length records, not to affect analysis later on
calitranmissing = calitrandataonly.filter(lambda x:len(x)==0)

In [10]:
# count of any zero length records
calitranmissing.count()

0

In [11]:
#mapping the data to be splitted by comma
calitranmap = calitrandataonly.map(lambda x:x .split(','))

In [34]:
calitranmap.first()

['42',
 'Percent of residents mode of transportation to work',
 '2000',
 '1',
 'AIAN',
 'CA',
 '06',
 'California',
 '',
 '',
 '',
 '',
 'ATHOME',
 'Worked at home',
 '73560',
 '2414',
 '3.281674823',
 '',
 '',
 '',
 '',
 '',
 '0.855732547',
 '12/12/2013']

In [35]:
calitrandf= sqlContext.createDataFrame(calitranmap)

In [36]:
calitrandf

DataFrame[_1: string, _2: string, _3: string, _4: string, _5: string, _6: string, _7: string, _8: string, _9: string, _10: string, _11: string, _12: string, _13: string, _14: string, _15: string, _16: string, _17: string, _18: string, _19: string, _20: string, _21: string, _22: string, _23: string, _24: string]

In [40]:
Schemacalitrandf= calitranmap.map(lambda x: Row( reportyear=int(x[2]), race_eth_code=int(x[3]),
                                               race_eth_name=str(x[4]), geotype=str(x[5]),
                                               geovalue=int(x[6]), geoname=str(x[7]),
                                               mode=str(x[12]), mode_name=str(x[13]),
                                               pop_total=int(x[14]),
                                             pop_mode=int(x[15]), percent=float(x[16])  ))


In [41]:
calitrandf2= sqlContext.createDataFrame(Schemacalitrandf)

In [42]:
calitrandf2.printSchema()

root
 |-- geoname: string (nullable = true)
 |-- geotype: string (nullable = true)
 |-- geovalue: long (nullable = true)
 |-- mode: string (nullable = true)
 |-- mode_name: string (nullable = true)
 |-- percent: double (nullable = true)
 |-- pop_mode: long (nullable = true)
 |-- pop_total: long (nullable = true)
 |-- race_eth_code: long (nullable = true)
 |-- race_eth_name: string (nullable = true)
 |-- reportyear: long (nullable = true)



In [47]:
sqlContext.registerDataFrameAsTable(calitrandf2,"calitrantable")

In [49]:
#show contents of the sql table
sqlContext.sql("select reportyear, geonamne,mode from calitrantable").show()

AnalysisException: "cannot resolve '`geonamne`' given input columns: [calitrantable.mode, calitrantable.race_eth_code, calitrantable.pop_mode, calitrantable.reportyear, calitrantable.geotype, calitrantable.pop_total, calitrantable.percent, calitrantable.race_eth_name, calitrantable.mode_name, calitrantable.geovalue, calitrantable.geoname]; line 1 pos 19;\n'Project [reportyear#114L, 'geonamne, mode#107]\n+- SubqueryAlias `calitrantable`\n   +- LogicalRDD [geoname#104, geotype#105, geovalue#106L, mode#107, mode_name#108, percent#109, pop_mode#110L, pop_total#111L, race_eth_code#112L, race_eth_name#113, reportyear#114L], false\n"

In [13]:
#Explicitly creating a schema for selected fields only
#Not all fields are required for the analysis undertaken 
Schemacalitran = StructType([StructField('reportyear', IntegerType(), True),\
                             StructField('race_eth_code', IntegerType(), True), \
                            StructField('race_eth_name', StringType(), True),\
                             StructField('geotype', StringType(), True),\
                             StructField('geovalue', IntegerType(), True),\
                             StructField('geoname', StringType(), True),\
                             StructField('mode', StringType(), True),\
                             StructField('mode_name', StringType(), True),\
                             StructField('pop_total', IntegerType(), True),\
                             StructField('pop_mode', IntegerType(), True),\
                             StructField('percent', FloatType(), True),])



In [14]:
#Mapping columns to row entries
#Not all fields are required for the analysis undertaken
calitranschema = calitranmap.map(lambda x:(( int(x[2]),int(x[3]),str(x[4]),
                                             str(x[5]),int(x[6]),str(x[7]),
                                             str(x[12]),str(x[13]),int(x[14]),
                                             int(x[15]),float(x[16])  )))


In [15]:
# create a dataframe for selected data
dfcalitran = sqlContext.createDataFrame(calitranschema,Schemacalitran)


In [16]:
#viewing and validating dataframe
dfcalitran

DataFrame[reportyear: int, race_eth_code: int, race_eth_name: string, geotype: string, geovalue: int, geoname: string, mode: string, mode_name: string, pop_total: int, pop_mode: int, percent: float]

In [17]:
#viewing the schema of the dataframe
dfcalitran.printSchema()

root
 |-- reportyear: integer (nullable = true)
 |-- race_eth_code: integer (nullable = true)
 |-- race_eth_name: string (nullable = true)
 |-- geotype: string (nullable = true)
 |-- geovalue: integer (nullable = true)
 |-- geoname: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- mode_name: string (nullable = true)
 |-- pop_total: integer (nullable = true)
 |-- pop_mode: integer (nullable = true)
 |-- percent: float (nullable = true)



In [18]:
#showing the first two observation in the dataframe
#confirming columns and datatype 
dfcalitran.head(2)

[Row(reportyear=2000, race_eth_code=1, race_eth_name='AIAN', geotype='CA', geovalue=6, geoname='California', mode='ATHOME', mode_name='Worked at home', pop_total=73560, pop_mode=2414, percent=3.281674861907959),
 Row(reportyear=2000, race_eth_code=2, race_eth_name='Asian', geotype='CA', geovalue=6, geoname='California', mode='ATHOME', mode_name='Worked at home', pop_total=1634923, pop_mode=42902, percent=2.624099016189575)]

In [19]:
dfcalitran.show(10,truncate= False)

+----------+-------------+-------------+-------+--------+----------+-------+--------------+---------+--------+----------+
|reportyear|race_eth_code|race_eth_name|geotype|geovalue|geoname   |mode   |mode_name     |pop_total|pop_mode|percent   |
+----------+-------------+-------------+-------+--------+----------+-------+--------------+---------+--------+----------+
|2000      |1            |AIAN         |CA     |6       |California|ATHOME |Worked at home|73560    |2414    |3.2816749 |
|2000      |2            |Asian        |CA     |6       |California|ATHOME |Worked at home|1634923  |42902   |2.624099  |
|2000      |3            |AfricanAm    |CA     |6       |California|ATHOME |Worked at home|797818   |21348   |2.6757982 |
|2000      |4            |Latino       |CA     |6       |California|ATHOME |Worked at home|3865125  |75246   |1.9467934 |
|2000      |5            |NHOPI        |CA     |6       |California|ATHOME |Worked at home|42880    |1088    |2.5373135 |
|2000      |6           

In [20]:
dfcalitran

DataFrame[reportyear: int, race_eth_code: int, race_eth_name: string, geotype: string, geovalue: int, geoname: string, mode: string, mode_name: string, pop_total: int, pop_mode: int, percent: float]

In [30]:
#Registering sqltable for dataframe
sqlContext.registerDataFrameAsTable(dfcalitran,"calitrantable")

In [32]:
calitrantable

NameError: name 'calitrantable' is not defined

In [33]:
#show contents of the sql table
sqlContext.sql("select reportyear, geonamne,mode from Calitran_table").show()

AnalysisException: 'Table or view not found: Calitran_table; line 1 pos 38'