In [1]:
# -*- coding: utf-8 -*-
"""
Make sure you give execute privileges
-----------------------------------------------------------------------------

           Spark with Python: Setup Spyder IDE for Spark

             Copyright : V2 Maestros @2016
                    
Execute this script once when Spyder is started on Windows
-----------------------------------------------------------------------------
"""

import os
import sys
os.chdir("D:/SPARK/Practice_Problems/process_macro_data_fuel")
os.curdir

# Configure the environment. Set this up to the directory where
# Spark is installed
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = 'C:/Spark/spark-1.6.0-bin-hadoop2.6'

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))

#Initiate Spark context. Once this is done all other applications can run
from pyspark import SparkContext
from pyspark import SparkConf

# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "12g")
conf.set("spark.cores.max", "4")

conf.setAppName("ma")

## Initialize SparkContext. Run only once. Otherwise you get multiple 
#Context Error.
sc = SparkContext('local', conf=conf)


Import SQL context

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
import collections
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

load data

In [75]:
fuelData = sc.textFile("fuel_data_1.csv")
fuelData.count()

1209

Selecting Header

In [76]:
header=fuelData.first()
header

u'Date,EMM_EPM0_PTE_R10_DPG,EMM_EPM0_PTE_R1X_DPG,EMM_EPM0_PTE_R1Y_DPG,EMM_EPM0_PTE_R1Z_DPG,EMM_EPM0_PTE_R20_DPG,EMM_EPM0_PTE_R30_DPG,EMM_EPM0_PTE_R40_DPG,EMM_EPM0_PTE_R50_DPG,EMM_EPM0_PTE_R5XCA_DPG'

Split the header to get schema

In [77]:
fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')]
fields

[StructField(Date,StringType,true),
 StructField(EMM_EPM0_PTE_R10_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R1X_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R1Y_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R1Z_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R20_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R30_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R40_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R50_DPG,StringType,true),
 StructField(EMM_EPM0_PTE_R5XCA_DPG,StringType,true)]

Assign appropriate datatype

In [78]:
fields[0].dataType = DateType()
fields[1].dataType = FloatType()
fields[2].dataType = FloatType()
fields[3].dataType = FloatType()
fields[4].dataType = FloatType()
fields[5].dataType = FloatType()
fields[6].dataType = FloatType()
fields[7].dataType = FloatType()
fields[8].dataType = FloatType()
fields[9].dataType = FloatType()
fields

[StructField(Date,DateType,true),
 StructField(EMM_EPM0_PTE_R10_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R1X_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R1Y_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R1Z_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R20_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R30_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R40_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R50_DPG,FloatType,true),
 StructField(EMM_EPM0_PTE_R5XCA_DPG,FloatType,true)]

We can construct our schema, which we will use later below for building the data frame

In [79]:
schema = StructType(fields)

Remove the header using filter

In [80]:
fuelFile=fuelData.filter(lambda x: x!=header)
fuelFile.take(1)

[u'4/5/1993,1.04,1.068,1.068,1.023,1.061,1.064,1.093,1.152,0']

Assigning datatype through schema

In [86]:
def safe_cast(val, to_type, default=None):
    try:
        return to_type(val)
    except ValueError:
        return default
fueltemp = fuelFile.map(lambda k: k.split(",")).map(lambda p: (parse(p[0]), safe_cast(p[1], float,0.0), safe_cast(p[2], float,0.0), safe_cast(p[3], float,0.0), safe_cast(p[4], float,0.0) , safe_cast(p[5], float,0.0), safe_cast(p[6], float,0.0) ,safe_cast(p[7], float,0.0), safe_cast(p[8], float,0.0), safe_cast(p[9], float,0.0)))
fueltemp.take(1)

[(datetime.datetime(1993, 4, 5, 0, 0),
  1.04,
  1.068,
  1.068,
  1.023,
  1.061,
  1.064,
  1.093,
  1.152,
  0.0)]

In [87]:
fuelDF = sqlContext.createDataFrame(fueltemp, schema)
fuelDF.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
|      Date|EMM_EPM0_PTE_R10_DPG|EMM_EPM0_PTE_R1X_DPG|EMM_EPM0_PTE_R1Y_DPG|EMM_EPM0_PTE_R1Z_DPG|EMM_EPM0_PTE_R20_DPG|EMM_EPM0_PTE_R30_DPG|EMM_EPM0_PTE_R40_DPG|EMM_EPM0_PTE_R50_DPG|EMM_EPM0_PTE_R5XCA_DPG|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
|1993-04-05|                1.04|               1.068|               1.068|               1.023|               1.061|               1.064|               1.093|               1.152|                   0.0|
|1993-04-12|               1.047|               1.073|               1.072|               1.032|               1.077|               1.071|               1.118|               1.154|    

Adding new column in SQL dataframe

In [108]:
from pyspark.sql.functions import lit
fuelDF1 = fuelDF.withColumn("File", lit(1))
fuelDF1.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|      Date|EMM_EPM0_PTE_R10_DPG|EMM_EPM0_PTE_R1X_DPG|EMM_EPM0_PTE_R1Y_DPG|EMM_EPM0_PTE_R1Z_DPG|EMM_EPM0_PTE_R20_DPG|EMM_EPM0_PTE_R30_DPG|EMM_EPM0_PTE_R40_DPG|EMM_EPM0_PTE_R50_DPG|EMM_EPM0_PTE_R5XCA_DPG|File|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|1993-04-05|                1.04|               1.068|               1.068|               1.023|               1.061|               1.064|               1.093|               1.152|                   0.0|   1|
|1993-04-12|               1.047|               1.073|               1.072|               1.032|               1.077|               1.071|               1.118|     

In [92]:
file=sc.parallelize("F")
file.collect()

['F']

Converting from RDD to DF and vice versa


In [102]:
testRDD=fuelDF1.rdd # return an rdd




In [103]:
testRDD.toDF() # return a DataFrame

DataFrame[Date: date, EMM_EPM0_PTE_R10_DPG: double, EMM_EPM0_PTE_R1X_DPG: double, EMM_EPM0_PTE_R1Y_DPG: double, EMM_EPM0_PTE_R1Z_DPG: double, EMM_EPM0_PTE_R20_DPG: double, EMM_EPM0_PTE_R30_DPG: double, EMM_EPM0_PTE_R40_DPG: double, EMM_EPM0_PTE_R50_DPG: double, EMM_EPM0_PTE_R5XCA_DPG: double, File: bigint]

In [106]:
fuelData1 = sc.textFile("fuel_data_1.csv")
header1=fuelData1.first()
fields1= [StructField(field_name, StringType(), True) for field_name in header1.split(',')]
fields1[0].dataType = DateType()
fields1[1].dataType = FloatType()
fields1[2].dataType = FloatType()
fields1[3].dataType = FloatType()
fields1[4].dataType = FloatType()
fields1[5].dataType = FloatType()
fields1[6].dataType = FloatType()
fields1[7].dataType = FloatType()
fields1[8].dataType = FloatType()
fields1[9].dataType = FloatType()
schema1 = StructType(fields1)
fuelFile1=fuelData1.filter(lambda x: x!=header1)
def safe_cast(val, to_type, default=None):
    try:
        return to_type(val)
    except ValueError:
        return default
fueltemp1 = fuelFile1.map(lambda k: k.split(",")).map(lambda p: (parse(p[0]), safe_cast(p[1], float,0.0), safe_cast(p[2], float,0.0), safe_cast(p[3], float,0.0), safe_cast(p[4], float,0.0) , safe_cast(p[5], float,0.0), safe_cast(p[6], float,0.0) ,safe_cast(p[7], float,0.0), safe_cast(p[8], float,0.0), safe_cast(p[9], float,0.0)))
fuelDF1 = sqlContext.createDataFrame(fueltemp1, schema1)
fuelDF2 = fuelDF1.withColumn("File", lit(1))
fuelDF2.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|      Date|EMM_EPM0_PTE_R10_DPG|EMM_EPM0_PTE_R1X_DPG|EMM_EPM0_PTE_R1Y_DPG|EMM_EPM0_PTE_R1Z_DPG|EMM_EPM0_PTE_R20_DPG|EMM_EPM0_PTE_R30_DPG|EMM_EPM0_PTE_R40_DPG|EMM_EPM0_PTE_R50_DPG|EMM_EPM0_PTE_R5XCA_DPG|File|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|1993-04-05|                1.04|               1.068|               1.068|               1.023|               1.061|               1.064|               1.093|               1.152|                   0.0|   1|
|1993-04-12|               1.047|               1.073|               1.072|               1.032|               1.077|               1.071|               1.118|     

In [109]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

unionAll(fuelDF2, fuelDF1).show(2)

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|      Date|EMM_EPM0_PTE_R10_DPG|EMM_EPM0_PTE_R1X_DPG|EMM_EPM0_PTE_R1Y_DPG|EMM_EPM0_PTE_R1Z_DPG|EMM_EPM0_PTE_R20_DPG|EMM_EPM0_PTE_R30_DPG|EMM_EPM0_PTE_R40_DPG|EMM_EPM0_PTE_R50_DPG|EMM_EPM0_PTE_R5XCA_DPG|File|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----+
|1993-04-05|                1.04|               1.068|               1.068|               1.023|               1.061|               1.064|               1.093|               1.152|                   0.0|   1|
|1993-04-12|               1.047|               1.073|               1.072|               1.032|               1.077|               1.071|               1.118|     

In [112]:
import glob
x=glob.glob("*.csv")
print x

['fuel_data_1.csv', 'fuel_data_2.csv', 'fuel_data_3.csv']


In [120]:
file=sc.parallelize([])

for i in x:
    file_i=sc.textFile(i)
    file=file + file_i
print file
file.collect()

UnionRDD[193] at union at null:-2


[u'Date,EMM_EPM0_PTE_R10_DPG,EMM_EPM0_PTE_R1X_DPG,EMM_EPM0_PTE_R1Y_DPG,EMM_EPM0_PTE_R1Z_DPG,EMM_EPM0_PTE_R20_DPG,EMM_EPM0_PTE_R30_DPG,EMM_EPM0_PTE_R40_DPG,EMM_EPM0_PTE_R50_DPG,EMM_EPM0_PTE_R5XCA_DPG',
 u'4/5/1993,1.04,1.068,1.068,1.023,1.061,1.064,1.093,1.152,0',
 u'4/12/1993,1.047,1.073,1.072,1.032,1.077,1.071,1.118,1.154,0',
 u'4/19/1993,1.054,1.074,1.077,1.04,1.067,1.081,1.12,1.155,0',
 u'4/26/1993,1.059,1.076,1.08,1.046,1.078,1.081,1.169,1.157,0',
 u'5/3/1993,1.062,1.08,1.084,1.05,1.073,1.084,1.161,1.161,0',
 u'5/10/1993,1.069,1.091,1.09,1.057,1.091,1.088,1.143,1.171,0',
 u'5/17/1993,1.073,1.096,1.095,1.06,1.107,1.097,1.136,1.177,0',
 u'5/24/1993,1.075,1.096,1.094,1.064,1.103,1.096,1.17,1.178,0',
 u'5/31/1993,1.078,1.097,1.099,1.065,1.102,1.096,1.164,1.186,0',
 u'6/7/1993,1.078,1.102,1.098,1.064,1.095,1.095,1.149,1.188,0',
 u'6/14/1993,1.076,1.103,1.097,1.062,1.092,1.094,1.137,1.188,0',
 u'6/21/1993,1.075,1.101,1.095,1.062,1.077,1.09,1.132,1.186,0',
 u'6/28/1993,1.072,1.1,1.095,1.0


Create Blank Schema

In [144]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema_i = StructType(fields)

# or df = sc.parallelize([]).toDF(schema)
df = sqlContext.createDataFrame([], schema)


Creating dataframe through loop

In [173]:
import pandas as pd
import glob
x=glob.glob("*.csv")
print x
#schema = StructType([])
#fuelALLDF = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df =pd.DataFrame()

for i in xrange(0,len(x)):
    y=x[i]
    fuelData_i=sc.textFile(y)
    header_i=fuelData_i.first()
    
    fields_i= [StructField(field_name, StringType(), True) for field_name in header_i.split(',')]
    fields_i[0].dataType = DateType()
    fields_i[1].dataType = FloatType()
    fields_i[2].dataType = FloatType()
    fields_i[3].dataType = FloatType()
    fields_i[4].dataType = FloatType()
    fields_i[5].dataType = FloatType()
    fields_i[6].dataType = FloatType()
    fields_i[7].dataType = FloatType()
    fields_i[8].dataType = FloatType()
    fields_i[9].dataType = FloatType()
    schema_i = StructType(fields_i)
    fuelFile_i=fuelData_i.filter(lambda x: x!=header1)
    def safe_cast(val, to_type, default=None):
        try:
            return to_type(val)
        except ValueError:
            return default
    fueltemp_i = fuelFile_i.map(lambda k: k.split(",")).map(lambda p: (parse(p[0]), safe_cast(p[1], float,0.0), safe_cast(p[2], float,0.0), safe_cast(p[3], float,0.0), safe_cast(p[4], float,0.0) , safe_cast(p[5], float,0.0), safe_cast(p[6], float,0.0) ,safe_cast(p[7], float,0.0), safe_cast(p[8], float,0.0), safe_cast(p[9], float,0.0)))
    fuelDF_i = sqlContext.createDataFrame(fueltemp_i, schema_i)
    fuelDF2_i = fuelDF_i.withColumn("File", lit(i))
    fuelDF3_i=fuelDF2_i.toPandas()
    df=df.append(fuelDF3_i)
    #def unionAll(*dfs):
        #return reduce(DataFrame.unionAll, dfs)
        
    #df1=unionAll(df, fuelDF3_i)
#df = pd.concat(df, axis=1)
df.head()
#df1.show(2)

['fuel_data_1.csv', 'fuel_data_2.csv', 'fuel_data_3.csv']


Unnamed: 0,Date,EMM_EPM0_PTE_R10_DPG,EMM_EPM0_PTE_R1X_DPG,EMM_EPM0_PTE_R1Y_DPG,EMM_EPM0_PTE_R1Z_DPG,EMM_EPM0_PTE_R20_DPG,EMM_EPM0_PTE_R30_DPG,EMM_EPM0_PTE_R40_DPG,EMM_EPM0_PTE_R50_DPG,EMM_EPM0_PTE_R5XCA_DPG,File
0,1993-04-05,1.04,1.068,1.068,1.023,1.061,1.064,1.093,1.152,0,0
1,1993-04-12,1.047,1.073,1.072,1.032,1.077,1.071,1.118,1.154,0,0
2,1993-04-19,1.054,1.074,1.077,1.04,1.067,1.081,1.12,1.155,0,0
3,1993-04-26,1.059,1.076,1.08,1.046,1.078,1.081,1.169,1.157,0,0
4,1993-05-03,1.062,1.08,1.084,1.05,1.073,1.084,1.161,1.161,0,0


In [174]:
df.File.unique()

array([0, 1, 2], dtype=int64)

In [154]:
x[1]

'fuel_data_2.csv'