In [50]:
import pixiedust

from math import sqrt, log, exp
import csv
import random
from datetime import datetime

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import udf

# For categorical variables
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression, LinearRegressionModel


from pyspark.sql.functions import col, udf

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

conf = (SparkConf()
 .setMaster("local")
 .setAppName("Cleaner")
 .set("spark.executor.memory", "1g"))

pixiedust.enableJobMonitor()

Spark Job Progress Monitor already enabled


In [19]:
def lengthInspector(rdd):
    lenCounter = rdd.map(lambda x: (len(x), 1))\
                 .reduceByKey(lambda x, y: x + y)
    lenList = lenCounter.collect()
    return lenList

In [20]:
def buildMainKey(line):
    mainKey = "Farm " + line[1].strip() + " House " + line[2].strip() + " Flock " + line[3].strip()
    newline = [mainKey] + line
    return newline

In [21]:
def dataTaker(mainKey, dataIndex):
    """
    Return a list of data under given main key and data idx
    """
    if type(mainKey) is int:
        mainKey = numMainKey[mainKey]
    if type(dataIndex) is str:
        for key in numDataIndex:
            if dataIndex == numDataIndex[key]: 
                dataIndex = key
                break
    
    result = cleanData.filter(lambda line: line[0] == mainKey)\
                      .map(lambda line: line[dataIndex])\
                      .collect() # dataIndex should not minus 1 because main key is line[0]
    return result

In [22]:
def reduceDot(linex, liney):
    new_list = []
    for i in range(len(linex)):
        new_list.append(linex[i] or liney[i])
    return new_list

In [23]:
def numberCleaner(line):
    """
    This function needs to be improved
    """
    resultList = []
    item = None
    for idx in range(len(line)):
        if idx <= 12 or idx == 38:
            if idx == 6: 
                if not line[idx]: item = -1
                else:
                    item = line[idx].strip().replace(',', '').replace(' ', '')
                    item = int(item)
            else:
                if not line[idx]: item = ""
                else: item = line[idx]
        elif checkAllDot[idx]:
            if not line[idx]: item = -0.1
            else:
                item = line[idx].strip().replace(',', '').replace(' ', '')
                item = float(item)
        else:
            if not line[idx]: item = -1
            else:
                item = line[idx].strip().replace(',', '').replace(' ', '').replace('"', '')
                item = int(item)
        resultList.append(item)
    return resultList

In [24]:
def transferNone(line):
    new_line = []
    for item in line:
        if type(item) != str:
            if item < 0: new_line.append(None)
            else: new_line.append(item)
        else:
            if not item: new_line.append(None)
            else:new_line.append(item)
    return new_line

In [25]:
def selectArea(Main_Keys, inCols):
    selectRows = udf(lambda x: x in Main_Keys, BooleanType())
    area = broilersDF.filter(selectRows(broilersDF.Main_Key)).select(inCols)
    return area

In [26]:
def selectRDD(inRows, inCols):
    """
    !!!Replace dataTaker!!!
    
    Return RDD containing given main keys and data idxs
    """
    newRows = []
    newCols = []
    for row in inRows:
        if type(row) is int: newRows.append(numMainKey[row])
        else: newRows.append(row)
    for col in inCols:
        if type(col) is str:
            for key in numDataIndex:
                if col == numDataIndex[key]:
                    newCols.append(key)
                    break
        else: newCols.append(col)
    
    newRDD = broilers.filter(lambda line: line[0] in newRows)\
                      .map(lambda line: [line[idx] for idx in newCols])
    #result = sqlContext.createDataFrame(newRDD, inCols)
    return inCols, newRDD

In [27]:
def transformRDD(inputRDD, inCols):
    outCols = inCols + ['square', 'sqrt', 'ln', 'exp']
    trRDD = inputRDD.filter(lambda x: x[0])\
                    .map(lambda x: [x[0], x[1]] if x[1] else [x[0], 0])\
                    .map(lambda x: [x[0], x[1], x[0] ** 2, sqrt(x[0]), log(x[0]), exp(x[0])])
    outDF = sqlContext.createDataFrame(trRDD, outCols)
    return outDF

In [28]:
def Do_Machine_Learning(trainingSet, testSet, xCols, yValues, regressor = LinearRegression(),
                         paramGrid = [], evalMetric = "rmse", seed = None):
    """
    Return (<object> model, <float> error_estimate, <dataframe> result)
    
    trainingSet: dataframe, used to train model
    testSet: dataframe, used to feed the model and get the result(and error estimate)
    xCols: list of strings, names of columns which used as inputs
    yValues: string, name of column that contains dependent values 
    regressor: Regression object, by default = LinearRegression()
    paramGrid: list built byParamGridBuilder, by default = empty list
    evalMetric: string, name of matrix used for evaluation, by default = "rmse"
    seed: int or None, seed for random number generator, if == None will use random numbers
    
    !!! seed is useless at that time !!!
    """
    # push estimator into pipeline
    vec = VectorAssembler(inputCols = xCols, outputCol = "features")
    regPipeline = Pipeline()
    regPipeline.setStages([vec, regressor])   
    # build evaluator
    regEval = RegressionEvaluator(predictionCol = "Predicted", labelCol = yValues, metricName = evalMetric)
    # combine estimator and evaluator to a cross validator
    crossval = CrossValidator(estimator = regPipeline, evaluator = regEval, numFolds = 3)
    # set parameters grid
    crossval.setEstimatorParamMaps(paramGrid)
    # trainning
    regModel = crossval.fit(trainingSet).bestModel
    # predicting
    predictions = regModel.transform(testSet)
    # get evaluating result
    evaluation = regEval.evaluate(predictions)
    
    return regModel, evaluation, predictions

In [38]:
with open('Project/FF_broilers_v2.csv', 'r') as fin:
    with open('Project/FF_broilers_v2_tab.txt', 'w') as fout:
        reader = csv.DictReader(fin)
        writer = csv.DictWriter(fout, reader.fieldnames, delimiter='\t')
        writer.writeheader()
        writer.writerows(reader)

In [39]:
"""
Extract head and content from file
"""
rdd = sc.textFile('Project/FF_broilers_v2_tab.txt')

lineSpliter = rdd.map(lambda x: x.split('\t'))
print("The file has {0} lines\n".format(lineSpliter.count()))

oldHeader = ['MainKey']

header = ['Main_Key', 'Date', 'Customer_Code', 'House', 'Flock', 'Gene_Line', 'Birds_Begin', 'Hatch_Date', 'Arrive_Date',\
 'Remove_date', 'Deactivate_Date', 'Veterinarian', 'Hatchery', 'Age', 'Birds_Present', 'Birds_thinned', 'Death',\
 'Death_Cum', 'Total_Death_Rate', 'Alive_Rate', 'Body_Weight_g', 'Uniformity_Rate', 'Daily_Gaing', 'Avg_Daily_Gaing_Per_Day',\
 'Feed_Intake_Per_house_kg', 'FCR_Cum', 'Wheat_Per_Bird_Cum', 'Wheat_Per_Bird', 'Wheat_Per_Day',\
 'Feed_Intake_Per_Bird_Housed_Cum_kg', 'Feed_Intake_Per_Bird_g', 'Wheat_g', 'FCR', 'Water_l', 'Water_Intake_Per_Bird_ml',\
 'Water_Intake_Per_Bird_Cum_l', 'Water_Feed', 'Water_FeedCum', 'Comment']

# Codes in for-loop is useless now
for col in lineSpliter.take(1)[0]:
    oldHeader.append(col.strip()
                     .replace(' ', '')
                     .replace('#', '')
                     .replace('(', '')
                     .replace(')', '')
                     .replace('.', '')
                     .replace('/', 'Per')
                     .replace('%', 'Percentage'))

print("The header of this file is \n{0}\n".format(header))
print("The header of this file has {0} lines".format(len(header)))

data = lineSpliter.filter(lambda x: x[0] != "Date")
#print("Data in this file has {0} lines".format(data.count()))

lenList = lengthInspector(data)
print("Distribution of length of lines in this file: ")
print(lenList)

"""
Clean the empty lines, calculate main key and clean illegal main key
"""

emptyClener = data.filter(lambda x: len(x) >= 38)
#print("Not empty data has {0} lines".format(emptyClener.count()))

mainKeyCounter = emptyClener.map(lambda x: buildMainKey(x))\
                            .map(lambda x: (x[0], 1))\
                            .reduceByKey(lambda x, y: x + y)
print("\nThere are {0} different main keys".format(mainKeyCounter.count()))

errorMainKeyCleaner = emptyClener.filter(lambda x: False if x[3] == "" else True)
print("There are {0} lines with vaild main key".format(errorMainKeyCleaner.count()))

cleanData = errorMainKeyCleaner.map(lambda x: buildMainKey(x))
cleanMainKeyCounter = cleanData.map(lambda x: (x[0], 1))\
                               .reduceByKey(lambda x, y: x + y)
mainKeysWithCounts = cleanMainKeyCounter.collect()
mainKeys = cleanMainKeyCounter.map(lambda x: x[0]).collect()
#print("The main keys are:")
for mainkey in mainKeys:
    pass#print(mainkey)

The file has 3679 lines

The header of this file is 
['Main_Key', 'Date', 'Customer_Code', 'House', 'Flock', 'Gene_Line', 'Birds_Begin', 'Hatch_Date', 'Arrive_Date', 'Remove_date', 'Deactivate_Date', 'Veterinarian', 'Hatchery', 'Age', 'Birds_Present', 'Birds_thinned', 'Death', 'Death_Cum', 'Total_Death_Rate', 'Alive_Rate', 'Body_Weight_g', 'Uniformity_Rate', 'Daily_Gaing', 'Avg_Daily_Gaing_Per_Day', 'Feed_Intake_Per_house_kg', 'FCR_Cum', 'Wheat_Per_Bird_Cum', 'Wheat_Per_Bird', 'Wheat_Per_Day', 'Feed_Intake_Per_Bird_Housed_Cum_kg', 'Feed_Intake_Per_Bird_g', 'Wheat_g', 'FCR', 'Water_l', 'Water_Intake_Per_Bird_ml', 'Water_Intake_Per_Bird_Cum_l', 'Water_Feed', 'Water_FeedCum', 'Comment']

The header of this file has 39 lines
Distribution of length of lines in this file: 
[(38, 3677), (1, 1)]

There are 77 different main keys
There are 3673 lines with vaild main key


In [61]:
mainKeyArror = range(1, 74)
numMainKey = dict(zip(mainKeyArror, mainKeys))
print(numMainKey)

dataIndexArror = range(39)
numDataIndex = dict(zip(dataIndexArror, header))
print(numDataIndex)

{1: 'Farm A House 1 Flock 2016-07', 2: 'Farm A House 1 Flock 2016-10', 3: 'Farm A House 1 Flock 2016-12', 4: 'Farm A House 1 Flock 2017-02', 5: 'Farm B House 3 Flock 2016-11', 6: 'Farm B House 2 Flock 2016-11', 7: 'Farm B House 1 Flock 2016-11', 8: 'Farm B House 2 Flock 2016-12', 9: 'Farm B House 1 Flock 2016-12', 10: 'Farm B House 3 Flock 2016-12', 11: 'Farm B House 2 Flock 2017-01', 12: 'Farm B House 1 Flock 2017-01', 13: 'Farm B House 3 Flock 2017-01', 14: 'Farm B House 2 Flock 17-3', 15: 'Farm B House 3 Flock 17-03-2017', 16: 'Farm B House 1 Flock 17-3', 17: 'Farm C House 1 Flock 2016-09', 18: 'Farm C House 2 Flock 2016-09', 19: 'Farm C House 1 Flock Koppel 11', 20: 'Farm C House 2 Flock Koppel 11 stal 2', 21: 'Farm C House 2 Flock Beter Leven Stal 2 ronde 12', 22: 'Farm C House 1 Flock Beter Leven Stal 1 ronde 12', 23: 'Farm D House 1 Flock 2016-07', 24: 'Farm D House 2 Flock 2016-07', 25: 'Farm D House 3 Flock 2016-07', 26: 'Farm D House 2 Flock 2016-09', 27: 'Farm D House 3 Floc

In [66]:
checkAllDot = cleanData.map(lambda x: [('.' in i) for i in x])\
                       .reduce(lambda x, y: reduceDot(x, y))
print(checkAllDot)

numCleanData = cleanData.map(numberCleaner)

broilers = numCleanData.map(transferNone)

broilersDF = sqlContext.createDataFrame(numCleanData, header)

display(broilersDF)

[False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, True, True, False, False, False, False, False, True, True, True, True, True, True, True, True, True, True, True, True, True, True]


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# build indexer
onehotencoder = OneHotEncoder(inputCol='Gene_Line', outputCol='onehotencoded_gene_line')

# transform the data
df_onehotencoder = onehotencoder.transform(broilerDF)

# resulting df
df_onehotencoder.show()

In [None]:
test = ['Age', 'Death']

testCols, testRDD = selectRDD(mainKeys[1:3], test)
trainCols, trainRDD = selectRDD(mainKeys[2:], test)

train, test = broilersDF.randomSplit([8.0, 2.0], 940309160050)
train = train.cache()
test = test.cache()


In [78]:
# This cell is for preparing datasets used in regression

print(testRDD.take(20))

testDF = transformRDD(testRDD, testCols)
trainDF = transformRDD(trainRDD, trainCols)
#trl = ['Age', 'Death', (math.log(sample_test.Death)).alias('ln'), (sample_test.Death ** 0.5).alias('Death_sqrt')]
#print("attributes of the column are: {}".format(dir(sample_test.Death)))
#trDF = sample_test.select('Age', 'Death', (math.log(sample_test.Death)).alias('ln'), (sample_test.Death ** 0.5).alias('Death_sqrt'))
testDF.show()
trainDF.show()
print(trainRDD.filter(lambda x: not x[0]).filter(lambda x: x[0]).count())

[[0, 0], [1, 0], [2, 52], [3, 28], [4, 81], [5, 29], [6, 20], [7, 19], [8, 8], [9, 15], [10, 13], [11, 15], [12, 24], [13, 15], [14, 13], [15, 7], [16, 24], [17, 18], [18, 14], [19, 16]]
+---+-----+------+------------------+------------------+--------------------+
|Age|Death|square|              sqrt|                ln|                 exp|
+---+-----+------+------------------+------------------+--------------------+
|  1|    0|     1|               1.0|               0.0|   2.718281828459045|
|  2|   52|     4|1.4142135623730951|0.6931471805599453|    7.38905609893065|
|  3|   28|     9|1.7320508075688772|1.0986122886681098|  20.085536923187668|
|  4|   81|    16|               2.0|1.3862943611198906|  54.598150033144236|
|  5|   29|    25|  2.23606797749979|1.6094379124341003|   148.4131591025766|
|  6|   20|    36| 2.449489742783178| 1.791759469228055|   403.4287934927351|
|  7|   19|    49|2.6457513110645907|1.9459101490553132|  1096.6331584284585|
|  8|    8|    64|2.8284271247461

In [47]:
# build regressor
lr1 = LinearRegression()
lr1.setPredictionCol("Predicted")\
   .setLabelCol("Death")

# build parameter grid
regParam = [x / 100.0 for x in range(1, 10)]
pg1 = (ParamGridBuilder()
             .addGrid(lr1.regParam, regParam)
             .build())

print(type(lr1))
print(type(pg1))

# run ML
model1, result1, predictionDF1 = Do_Machine_Learning(trainDF, testDF, ["Age", "ln"], "Death", lr1, pg1)
model2, result2, predictionDF2 = Do_Machine_Learning(trainDF, testDF, ["Age", "square"], "Death", lr1, pg1)
model3, result3, predictionDF3 = Do_Machine_Learning(trainDF, testDF, ["Age", "exp"], "Death", lr1, pg1)


# print attributions of model
"""
print("attributes of the model are: {}".format(dir(model)))
print("method list: {}".format([method for method in dir(model) if callable(getattr(model, method))]))
print(model.stages)
"""

# Print coefficients and intercept
weights1 = model1.stages[1].coefficients
ic1 = model1.stages[1].intercept
weights2 = model2.stages[1].coefficients
ic2 = model2.stages[1].intercept
weights3 = model3.stages[1].coefficients
ic3 = model3.stages[1].intercept
print(weights1, weights2, weights3)
print(ic1, ic2, ic3)
#print(list(zip(["Age"], weights1)))
#print(model.stages[1].intercept)

# print error and result
print("Mean Squared Error: {0:2.2f}, {1:2.2f}, {2:2.2f}\n".format(result1, result2, result3))
predictionDF1.show()
predictionDF2.show()
predictionDF3.show()

# print the model
# print(model.stages)

<class 'pyspark.ml.regression.LinearRegression'>
<class 'list'>
[0.187461415354,-10.0797865496] [-1.16733277735,0.0135686779989] [-0.286882289482,4.8790878619e-38]
40.09637783602841 32.48891692253047 22.71960206201307
Mean Squared Error: 13.83, 13.79, 14.98

+---+-----+------+------------------+------------------+--------------------+--------------------+------------------+
|Age|Death|square|              sqrt|                ln|                 exp|            features|         Predicted|
+---+-----+------+------------------+------------------+--------------------+--------------------+------------------+
|  1|    0|     1|               1.0|               0.0|   2.718281828459045|           [1.0,0.0]| 40.28383925138232|
|  2|   52|     4|1.4142135623730951|0.6931471805599453|    7.38905609893065|[2.0,0.6931471805...| 33.48452503926224|
|  3|   28|     9|1.7320508075688772|1.0986122886681098|  20.085536923187668|[3.0,1.0986122886...|29.584984711591332|
|  4|   81|    16|               

In [None]:
display()

In [None]:
categorical_columns = ['Gene_Line', 'MainKey']

##=== build stages ======
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in categorical_columns]
onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehotencoded_' + c) for c in categorical_columns]
all_stages = stringindexer_stages + onehotencoder_stages

## build pipeline model
pipeline = Pipeline(stages=all_stages)

## fit pipeline model
pipeline_mode = pipeline.fit(broilerDF)

## transform data
df_coded = pipeline_mode.transform(df)


## remove uncoded columns
# selected_columns = ['onehotencoded_' + c for c in categorical_columns] + ['x4', 'y1', 'y2']
cols = broilerDF.schema.names
cols_to_keep = [col for col in cols if col not in categorical_columns]
df_coded = df_coded.select(cols_to_keep)