In [19]:
#Functions to convert data
from pyspark.sql import Row
from datetime import datetime

def toIntSafe(inval):
    try:
        return int(inval)
    except ValueError:
        return None

def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
    except ValueError:
        return None

def toDateSafe(inval):
    try:
        return datetime.strptime(inval, "%m/%d/%Y")
    except ValueError:
        return None
    
def toFloatSafe(inval):
    try:
        return float(inval)
    except ValueError:
        return None

In [20]:
file_name = 'babs_open_data_year_1/201408_babs_open_data/201408_weather_data.csv'

In [21]:
# Step 1: Import and remove header
weather_201408 = sc.textFile(file_name)
header = weather_201408.first() #extract header
weather_201408 = weather_201408.filter(lambda row: row != header) #filter to exclude header

In [22]:
# Step 2: Split rows by commas and convert to tuples
weather_201408_split = weather_201408.map(lambda x: x.split(","))
weather_201408_tuple = weather_201408_split.map(lambda l: tuple(l))

In [7]:
# Check: Print out tuple-formatted data
weather_201408_tuple.take(5)

In [35]:
#Step 3: Define Conversion Function
def convStationString(r):
    return Row(
    toDateSafe(r[0])
    ,toIntSafe(r[1])
    ,toIntSafe(r[2])
    ,toIntSafe(r[3])
    ,toIntSafe(r[4])
    ,toIntSafe(r[5])
    ,toIntSafe(r[6])
    ,toIntSafe(r[7])
    ,toIntSafe(r[8])
    ,toIntSafe(r[9])
    ,toFloatSafe(r[10])
    ,toFloatSafe(r[11])
    ,toFloatSafe(r[12])
    ,toIntSafe(r[13])
    ,toIntSafe(r[14])
    ,toIntSafe(r[15])
    ,toIntSafe(r[16])
    ,toIntSafe(r[17])
    ,toIntSafe(r[18])
    ,toFloatSafe(r[19])
    ,toIntSafe(r[20])
    ,r[21]
    ,toIntSafe(r[22])
    ,r[23]
    )

In [36]:
# Step 4: Define Structure Type
from pyspark.sql.types import *

stationSchema = StructType([
  StructField("Date", DateType(), True),
  StructField("max_temp", IntegerType(), True),
  StructField("mean_temp", IntegerType(), True),
  StructField("min_temp", IntegerType(), True),
  StructField("max_dp", IntegerType(), True),
  StructField("mean_dp", IntegerType(), True),
  StructField("min_dp", IntegerType(), True),
  StructField("max_humidity", IntegerType(), True),
  StructField("mean_humidity", IntegerType(), True),
  StructField("min_humidity", IntegerType(), True),
  StructField("max_pressure", FloatType(), True),
  StructField("mean_pressure", FloatType(), True),
  StructField("min_pressure", FloatType(), True),
  StructField("max_visibility", IntegerType(), True),
  StructField("mean_visibility", IntegerType(), True),
  StructField("min_visibility", IntegerType(), True),
  StructField("max_ws", IntegerType(), True),
  StructField("mean_ws", IntegerType(), True),
  StructField("max_gust", IntegerType(), True),
  StructField("precipitation", FloatType(), True),
  StructField("cloud_cover", IntegerType(), True),
  StructField("events", StringType(), True),
  StructField("wind_dir", IntegerType(), True),
  StructField("zip_code", StringType(), True)
  ])

In [37]:
# Step 5: Convert Input Data
weather_201408_RDD = weather_201408_tuple.map(lambda x: convStationString(x))

In [38]:
# Step 6: Create DataFrame with Schema
weather_201408_DF = sqlContext.createDataFrame(weather_201408_RDD, stationSchema)

In [39]:
# Check: Print Schema
weather_201408_DF.printSchema()

root
 |-- Date: date (nullable = true)
 |-- max_temp: integer (nullable = true)
 |-- mean_temp: integer (nullable = true)
 |-- min_temp: integer (nullable = true)
 |-- max_dp: integer (nullable = true)
 |-- mean_dp: integer (nullable = true)
 |-- min_dp: integer (nullable = true)
 |-- max_humidity: integer (nullable = true)
 |-- mean_humidity: integer (nullable = true)
 |-- min_humidity: integer (nullable = true)
 |-- max_pressure: float (nullable = true)
 |-- mean_pressure: float (nullable = true)
 |-- min_pressure: float (nullable = true)
 |-- max_visibility: integer (nullable = true)
 |-- mean_visibility: integer (nullable = true)
 |-- min_visibility: integer (nullable = true)
 |-- max_ws: integer (nullable = true)
 |-- mean_ws: integer (nullable = true)
 |-- max_gust: integer (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- cloud_cover: integer (nullable = true)
 |-- events: string (nullable = true)
 |-- wind_dir: integer (nullable = true)
 |-- zip_code: string (n

In [40]:
# Check: Print First 20 Rows of Data Frame
weather_201408_DF.show()

+----------+--------+---------+--------+------+-------+------+------------+-------------+------------+------------+-------------+------------+--------------+---------------+--------------+------+-------+--------+-------------+-----------+------+--------+--------+
|      Date|max_temp|mean_temp|min_temp|max_dp|mean_dp|min_dp|max_humidity|mean_humidity|min_humidity|max_pressure|mean_pressure|min_pressure|max_visibility|mean_visibility|min_visibility|max_ws|mean_ws|max_gust|precipitation|cloud_cover|events|wind_dir|zip_code|
+----------+--------+---------+--------+------+-------+------+------------+-------------+------------+------------+-------------+------------+--------------+---------------+--------------+------+-------+--------+-------------+-----------+------+--------+--------+
|2014-03-01|      69|       62|      54|    54|     48|    44|          78|           63|          48|       29.82|        29.63|       29.47|            10|             10|            10|    17|     10|     

In [14]:
# Check: Compare to raw data
weather_201408.take(20)

[u'3/1/2014,69,62,54,54,48,44,78,63,48,29.82,29.63,29.47,10,10,10,17,10,25,0.03,7,Rain,108,94107',
 u'3/2/2014,61,57,53,55,51,47,83,75,67,30.17,30.01,29.83,10,9,2,20,9,26,0.02,6,Rain,181,94107',
 u'3/3/2014,61,56,50,52,49,47,89,77,64,30.15,30.11,30.04,10,10,9,18,6,28,0.1,7,Rain,135,94107',
 u'3/4/2014,66,61,55,54,52,49,86,77,67,30.12,30.09,30.06,10,10,7,17,5,,T,7,Rain,16,94107',
 u'3/5/2014,66,60,54,58,54,48,93,73,52,30.11,30.06,29.99,10,8,2,20,6,23,0.26,8,Rain,211,94107',
 u'3/6/2014,65,58,51,57,53,48,93,80,67,30.2,30.13,30.06,10,9,4,15,8,20,0.01,6,Rain,258,94107',
 u'3/7/2014,66,57,48,50,47,43,89,67,44,30.19,30.15,30.11,10,10,9,14,6,24,0,2,,296,94107',
 u'3/8/2014,72,59,46,51,48,39,93,63,33,30.21,30.16,30.12,10,10,10,17,4,48,0,5,Fog,285,94107',
 u'3/9/2014,71,63,55,54,51,48,80,69,57,30.17,30.13,30.07,10,10,10,13,6,18,T,7,Rain,291,94107',
 u'3/10/2014,68,62,56,55,48,43,86,66,45,30.24,30.17,30.11,10,10,9,20,8,22,T,5,Rain,307,94107',
 u'3/11/2014,74,61,47,40,31,25,61,40,19,30.19,30.1,30