In [9]:
#import findspark
#findspark.init()

import pandas as pd
from pyspark.sql.functions import udf, monotonically_increasing_id
from datetime import datetime
import random
from word2number import w2n

from pyspark.sql import DataFrameStatFunctions as statFunc
import pyspark.sql.functions as func
import re
from pyspark import SparkContext, SparkConf, SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)


In [None]:
sqlContext.sql

In [35]:
df_pandas = pd.DataFrame({"Input 1" : ['Green','Green','Blue','Red','Yellow','Red','Red','Blue'],
                          "Input 2" : ['1','0','4','5','7','9','6','5'],
                          "Input 3" : ['1.3','1.445','1.2','1.3','1.325','1.4','1.72158','1'],
                          "Description" : ['DescriptionONE','DescriptionONE','DescriptionTWO','Description THREE','Description Four','DescriptionONE','Description THREE','DescriptionONE']   
                         })
df1 = sqlContext.createDataFrame(df_pandas)

In [45]:
df1.show()

+-----------------+-------+-------+-------+
|      Description|Input 1|Input 2|Input 3|
+-----------------+-------+-------+-------+
|   DescriptionONE|  Green|      1|    1.3|
|   DescriptionONE|  Green|      0|  1.445|
|   DescriptionTWO|   Blue|      4|    1.2|
|Description THREE|    Red|      5|    1.3|
| Description Four| Yellow|      7|  1.325|
|   DescriptionONE|    Red|      9|    1.4|
|Description THREE|    Red|      6|1.72158|
|   DescriptionONE|   Blue|      5|      1|
+-----------------+-------+-------+-------+



In [377]:
def step_2a(string):
    return re.sub('Description THREE', 'DescriptionTHREE', string)
step_2a=func.udf(step_2a)

def step_2b(string):
    return re.sub('Description Four', 'DescriptionFOUR', string)
step_2b=func.udf(step_2b)

df2 = df1.withColumn('Description', step_2b(step_2a(df1['Description'])))
df2.show()

+----------------+-------+-------+-------+
|     Description|Input 1|Input 2|Input 3|
+----------------+-------+-------+-------+
|  DescriptionONE|  Green|      1|    1.3|
|  DescriptionONE|  Green|      0|  1.445|
|  DescriptionTWO|   Blue|      4|    1.2|
|DescriptionTHREE|    Red|      5|    1.3|
| DescriptionFOUR| Yellow|      7|  1.325|
|  DescriptionONE|    Red|      9|    1.4|
|DescriptionTHREE|    Red|      6|1.72158|
|  DescriptionONE|   Blue|      5|      1|
+----------------+-------+-------+-------+



In [258]:
df3 = df2.withColumn('Input 3', df2['Input 3'].cast(DoubleType()))
df3 = df3.withColumn('Input 3', func.format_number(df3['Input 3'], 4))
df3 = df3.withColumn('Input 3', df3['Input 3'].cast(StringType()))

df3.show()

+----------------+-------+-------+-------+
|     Description|Input 1|Input 2|Input 3|
+----------------+-------+-------+-------+
|  DescriptionONE|  Green|      1| 1.3000|
|  DescriptionONE|  Green|      0| 1.4450|
|  DescriptionTWO|   Blue|      4| 1.2000|
|DescriptionTHREE|    Red|      5| 1.3000|
| DescriptionFOUR| Yellow|      7| 1.3250|
|  DescriptionONE|    Red|      9| 1.4000|
|DescriptionTHREE|    Red|      6| 1.7216|
|  DescriptionONE|   Blue|      5| 1.0000|
+----------------+-------+-------+-------+



In [67]:
df_pandas = pd.DataFrame({'col1' : ['Green','Yellow','Red','Blue'],
                          'col2' : ['Night','Morning','Afternoon','Evening']   
                         })
df4 = sqlContext.createDataFrame(df_pandas)
df5 = df4.withColumnRenamed("col1", "Input 1")\
         .withColumnRenamed("col2", "Day Period")
df5.show()


+-------+----------+
|Input 1|Day Period|
+-------+----------+
|  Green|     Night|
| Yellow|   Morning|
|    Red| Afternoon|
|   Blue|   Evening|
+-------+----------+



In [199]:
df6 = df3.join(df5, "Input 1", "left")
df6.show()

+-------+----------------+-------+-------+----------+
|Input 1|     Description|Input 2|Input 3|Day Period|
+-------+----------------+-------+-------+----------+
|  Green|  DescriptionONE|      1| 1.3000|     Night|
|  Green|  DescriptionONE|      0| 1.4450|     Night|
|   Blue|  DescriptionTWO|      4| 1.2000|   Evening|
|   Blue|  DescriptionONE|      5| 1.0000|   Evening|
| Yellow| DescriptionFOUR|      7| 1.3250|   Morning|
|    Red|DescriptionTHREE|      5| 1.3000| Afternoon|
|    Red|  DescriptionONE|      9| 1.4000| Afternoon|
|    Red|DescriptionTHREE|      6| 1.7216| Afternoon|
+-------+----------------+-------+-------+----------+



In [369]:
def step_7(idx):
    random.seed(idx)
    test = date.fromordinal(random.randint(0, date.today().toordinal()))
    return str(test)
step_7=func.udf(step_7)


df7 = df6.withColumn("temp_index", monotonically_increasing_id())
df7 = df7.withColumn("Date", step_7(df7['temp_index'])).drop("temp_index")
#df7 = df7.withColumn('Date', df7['Date'])
df7.show()

+-------+----------------+-------+-------+----------+----------+
|Input 1|     Description|Input 2|Input 3|Day Period|      Date|
+-------+----------------+-------+-------+----------+----------+
|  Green|  DescriptionONE|      1| 1.3000|     Night|0843-07-30|
|  Green|  DescriptionONE|      0| 1.4450|     Night|1691-05-24|
|   Blue|  DescriptionTWO|      4| 1.2000|   Evening|0298-07-15|
|   Blue|  DescriptionONE|      5| 1.0000|   Evening|0505-07-31|
| Yellow| DescriptionFOUR|      7| 1.3250|   Morning|1448-04-15|
|    Red|DescriptionTHREE|      5| 1.3000| Afternoon|0970-05-02|
|    Red|  DescriptionONE|      9| 1.4000| Afternoon|1952-06-02|
|    Red|DescriptionTHREE|      6| 1.7216| Afternoon|1056-04-07|
+-------+----------------+-------+-------+----------+----------+



In [314]:
df8 = df7.filter((func.col('Input 3') >= 1.31))\
         .filter((func.col('Input 1') == 'Red') | (func.col('Input 1') == 'Green'))

df8.show()

+-------+----------------+-------+-------+----------+----------+
|Input 1|     Description|Input 2|Input 3|Day Period|      Date|
+-------+----------------+-------+-------+----------+----------+
|  Green|  DescriptionONE|      0| 1.4450|     Night|1691-05-24|
|    Red|  DescriptionONE|      9| 1.4000| Afternoon|1952-06-02|
|    Red|DescriptionTHREE|      6| 1.7216| Afternoon|1056-04-07|
+-------+----------------+-------+-------+----------+----------+



In [341]:
df9 = df8.withColumn('date2', func.unix_timestamp(df8["Date"].cast(DateType())))
df9.show()
middle_date = statFunc(df9).approxQuantile("date2", [0.5], 0)

df9 = df9.withColumn("flag", func.when((df9["date2"] > middle_date[0]) & \
                                       (df9["Input 2"] > 1),
                                  value=1).otherwise(0)).drop('date2')
df9.show()

+-------+----------------+-------+-------+----------+----------+------------+
|Input 1|     Description|Input 2|Input 3|Day Period|      Date|       date2|
+-------+----------------+-------+-------+----------+----------+------------+
|  Green|  DescriptionONE|      0| 1.4450|     Night|1691-05-24| -8791959600|
|    Red|  DescriptionONE|      9| 1.4000| Afternoon|1952-06-02|  -554846400|
|    Red|DescriptionTHREE|      6| 1.7216| Afternoon|1056-04-07|-28834167600|
+-------+----------------+-------+-------+----------+----------+------------+

+-------+----------------+-------+-------+----------+----------+----+
|Input 1|     Description|Input 2|Input 3|Day Period|      Date|flag|
+-------+----------------+-------+-------+----------+----------+----+
|  Green|  DescriptionONE|      0| 1.4450|     Night|1691-05-24|   0|
|    Red|  DescriptionONE|      9| 1.4000| Afternoon|1952-06-02|   1|
|    Red|DescriptionTHREE|      6| 1.7216| Afternoon|1056-04-07|   0|
+-------+----------------+-------

In [408]:
def step_11(df1, df2):
    try:
        # Step 5 
        df2 = df2.withColumnRenamed("col1", "Input 1")\
                 .withColumnRenamed("col2", "Day Period")
    except:
        print("Code failed on step 5")
    
    # Step 6
    try:
        df = df1.join(df2, "Input 1", "left")
    except:
        print("Code failed on step 6")
    
    # Step 7 
    try:
        def step_7(idx):
            random.seed(idx)
            test = date.fromordinal(random.randint(0, date.today().toordinal()))
            return str(test)
        step_7 = func.udf(step_7)
        df = df.withColumn("temp_index", monotonically_increasing_id())
        df = df.withColumn("Date", step_7(df['temp_index'])).drop("temp_index")
    except:
        print("Code failed on step 7")
    
    # Step 8
    try:
        df = df.filter((func.col('Input 3') >= 1.31))\
               .filter((func.col('Input 1') == 'Red') | (func.col('Input 1') == 'Green'))
    except:
        print("Code failed on step 8")
    
    # Step 9
    try:
        df = df.withColumn('temp_date', func.unix_timestamp(df["Date"].cast(DateType())))
        middle_date = statFunc(df).approxQuantile("temp_date", [0.5], 0)
        df = df.withColumn("flag", func.when((df["temp_date"] > middle_date[0]) & \
                                             (df["Input 2"] > 1),
                                          value=1).otherwise(0)).drop('temp_date')
    except:
        print('Code failed on step 9')
    
    # Step 10
    try:
        result_list = {}
        for i,row in enumerate(df.rdd.collect()):
            input3 = row['Input 3']
            desc = w2n.word_to_num(row['Description'].split('Description')[1])
            input2_min = df.agg({"Input 2": "min"}).collect()[0]['min(Input 2)']
            try:
                result = (float(input3) + float(desc)) / (float(input2_min))
            except ZeroDivisionError:
                result = None
            result_df = sqlContext.createDataFrame([result], "string").toDF("result_{}".format(i))
            result_df.show()
            result_list['result_{}'.format(i)] = result_df
        return result_list
    except:
        print("Code failed on step 10")
        return None

final = step_11(df1=df3, df2=df4)

+--------+
|result_0|
+--------+
|    null|
+--------+

+--------+
|result_1|
+--------+
|    null|
+--------+

+--------+
|result_2|
+--------+
|    null|
+--------+



In [405]:
final['result_0'].show()

+--------+
|result_0|
+--------+
|    null|
+--------+

