In [1]:
# Libraries 
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from hashlib import md5

In [2]:
conf = SparkConf()
sc = SparkContext()

In [3]:
# Extracting the data
mat_performance = SQLContext(sc).read.csv('student-mat.csv', sep=';', header=True)
por_performance = SQLContext(sc).read.csv('student-por.csv', sep=';', header=True)

In [4]:
# Map functions
def generate_key_mat(row):
    data = row['school'] + " " + row['sex'] +  " " + row['age'] + " " + row['address'] + " " + row['famsize'] + " " + row['Pstatus'] + " " + row['Medu'] + " " + row['Fedu'] + " " + row['Mjob'] + " " + row['Fjob'] + " " + row['guardian'] + " " + row['traveltime'] + " " + row['freetime'] + " " + row['goout'] + " " + row['famrel'] + " " + row['Dalc'] + " " + row['Walc']  
    key = md5(data.encode('utf-8')).hexdigest() # Unique Key For Mapping
    row1 = {}
    row1['M_G1'] = float(row['G1'])
    row1['M_G2'] = float(row['G2'])
    row1['M_G3'] = float(row['G3'])
    row1['M_studytime'] = float(row['studytime'])
    row1['M_failures'] = float(row['failures'])
    row1['M_paid'] = row['paid']
    return (key, row1)

def generate_key_por(row):
    data = row['school'] + " " + row['sex'] +  " " + row['age'] + " " + row['address'] + " " + row['famsize'] + " " + row['Pstatus'] + " " + row['Medu'] + " " + row['Fedu'] + " " + row['Mjob'] + " " + row['Fjob'] + " " + row['guardian'] + " " + row['traveltime'] + " " + row['freetime'] + " " + row['goout'] + " " + row['famrel'] + " " + row['Dalc'] + " " + row['Walc']  
    key = md5(data.encode('utf-8')).hexdigest() # Unique Key For Mapping
    row['P_G1'] = float(row['G1'])
    row['P_G2'] = float(row['G2'])
    row['P_G3'] = float(row['G3'])
    row['P_studytime'] = float(row['studytime'])
    row['P_failures'] = float(row['failures'])
    row['P_paid'] = row['paid']
    row['traveltime'] = int(row['traveltime'])
    row['absences'] = int(row['absences'])
    row.pop('studytime')
    row.pop('failures')
    row.pop('paid')
    row.pop('G1')
    row.pop('G2')
    row.pop('G3')
    return (key, row)

def combine_mat_por_data(row):
    row1 = {}
    row1.update(row[1][0])
    row1.update(row[1][1])
    return row1

In [5]:
mat_performance_rdd = mat_performance.rdd.map(lambda row: row.asDict(recursive=True)).map(generate_key_por)
por_performance_rdd = por_performance.rdd.map(lambda row: row.asDict(recursive=True)).map(generate_key_mat)

combined_data = por_performance_rdd.join(mat_performance_rdd).map(combine_mat_por_data)

In [6]:
students_db_schema = StructType([
    StructField('school', StringType(), True),
    StructField('sex', StringType(), True),
    StructField('address', StringType(), True),
    StructField('famsize', StringType(),True),
    StructField('Pstatus', StringType(), True),
    StructField('Medu', StringType(), True),
    StructField('Fedu', StringType(), True),
    StructField('Mjob', StringType(), True),
    StructField('Fjob', StringType(), True),
    StructField('reason', StringType(), True),
    StructField('guardian', StringType(), True),
    StructField('traveltime', IntegerType(), True),
    StructField('higher', StringType(), True),
    StructField('internet', StringType(), True),
    StructField('famrel', StringType(), True),
    StructField('freetime',StringType(), True),
    StructField('goout', StringType(), True),
    StructField('Dalc', StringType(), True),
    StructField('Walc', StringType(), True),
    StructField('health', StringType(), True),
    StructField('absences', IntegerType(), True),
    StructField('M_studytime', DoubleType(), True),
    StructField('M_failures', DoubleType(), True),
    StructField('M_paid', StringType(), True),
    StructField('M_G1', DoubleType(), True),
    StructField('M_G2', DoubleType(), True),
    StructField('M_G3', DoubleType(), True),
    StructField('P_studytime', DoubleType(), True),
    StructField('P_failures', DoubleType(), True),
    StructField('P_paid', StringType(), True),
    StructField('P_G1', DoubleType(), True),
    StructField('P_G2', DoubleType(), True),
    StructField('P_G3', DoubleType(), True),
    
])

In [7]:
combined_mat_por_df = SQLContext(sc).createDataFrame(combined_data, students_db_schema)
combined_mat_por_df.coalesce(1).write.csv('students_combined_data', sep=";", header=True)

In [None]:
# Alternate way to store to csv
'''records = combined_mat_por_df.rdd.collect()
field_names  = ['school', 'sex', 'address', 'famsize', 'Pstatus','Medu', 'Fedu', 'Mjob', 'Fjob', 'reason', 'guardian', 'traveltime', 'higher', 'internet', 'famrel', 'goout', 'Dalc', 'Walc', 'health', 'absences', 'M_studytime', 'M_failures', 'M_paid','M_G1', 'M_G2', 'M_G3', 'P_studytime', 'P_failures', 'P_paid', 'P_G1', 'P_G2', 'P_G3']
import csv
with open('students_combined_data.csv', 'w') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=field_names, delimiter=';')
    writer.writeheader()
    for row in records:
        row_data = {}
        for name in field_names:
            row_data[name] = row[name]
        writer.writerow(row_data)'''