In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
import datetime
import pandas as pd

In [2]:
# spark = SparkSession.builder.master('spark://master:7077').appName("Jupyter").getOrCreate()
spark = SparkSession.builder.master('local[1]').appName("Jupyter").getOrCreate()
sc = spark.sparkContext
# sc = SparkContext('local') 
# spark = SparkSession(sc)


In [3]:
df = spark.read.csv('data/accidents_new.csv', header='true', inferSchema = True)

In [4]:
df.columns

['ID',
 'DATE',
 'WEEK_DAY',
 'REG',
 'MUNCP',
 'STREET',
 'TYPE_ACCDN',
 'SURFACE',
 'LIGHT',
 'STR_ASPCT',
 'STR_CONFIG',
 'METEO',
 'GRAVITE',
 'NB_VEH_IMPLIQUES_ACCDN',
 'NB_VICTIMES_TOTAL',
 'NB_MORTS',
 'NB_BLESSES_GRAVES',
 'NB_BLESS_LEGERS',
 'NB_DECES_PIETON',
 'NB_BLESSES_PIETON',
 'NB_VICTIMES_PIETON',
 'NB_DECES_MOTO',
 'NB_BLESSES_MOTO',
 'NB_VICTIMES_MOTO',
 'NB_DECES_VELO',
 'NB_BLESSES_VELO',
 'NB_VICTIMES_VELO',
 'nb_automobile_camion_leger',
 'nb_camionLourd_tractRoutier',
 'nb_outil_equipement',
 'nb_tous_autobus_minibus',
 'nb_bicyclette',
 'nb_cyclomoteur',
 'nb_motocyclette',
 'nb_taxi',
 'nb_urgence',
 'nb_motoneige',
 'nb_VHR',
 'nb_autres_types',
 'nb_veh_non_precise']

In [5]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- WEEK_DAY: string (nullable = true)
 |-- REG: string (nullable = true)
 |-- MUNCP: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- TYPE_ACCDN: integer (nullable = true)
 |-- SURFACE: integer (nullable = true)
 |-- LIGHT: integer (nullable = true)
 |-- STR_ASPCT: integer (nullable = true)
 |-- STR_CONFIG: integer (nullable = true)
 |-- METEO: integer (nullable = true)
 |-- GRAVITE: string (nullable = true)
 |-- NB_VEH_IMPLIQUES_ACCDN: integer (nullable = true)
 |-- NB_VICTIMES_TOTAL: integer (nullable = true)
 |-- NB_MORTS: integer (nullable = true)
 |-- NB_BLESSES_GRAVES: integer (nullable = true)
 |-- NB_BLESS_LEGERS: integer (nullable = true)
 |-- NB_DECES_PIETON: integer (nullable = true)
 |-- NB_BLESSES_PIETON: integer (nullable = true)
 |-- NB_VICTIMES_PIETON: integer (nullable = true)
 |-- NB_DECES_MOTO: integer (nullable = true)
 |-- NB_BLESSES_MOTO: integer (nullable = true)
 |

In [6]:
df = df.select('DATE', 'WEEK_DAY', 'LIGHT', 'METEO')
print('Number of rows: ', df.count())
df.show(10)

Number of rows:  128647
+-------------------+--------+-----+-----+
|               DATE|WEEK_DAY|LIGHT|METEO|
+-------------------+--------+-----+-----+
|2012-02-01 00:00:00|      ME|    1|   11|
|2012-06-28 00:00:00|      JE|    1|   11|
|2012-07-11 00:00:00|      ME|    3|   11|
|2012-01-03 00:00:00|      MA|    1|   11|
|2012-01-05 00:00:00|      JE|    3|   11|
|2012-01-05 00:00:00|      JE|    3|   17|
|2012-01-07 00:00:00|      SA|    1|   11|
|2012-01-09 00:00:00|      LU|    1|   12|
|2012-01-11 00:00:00|      ME|    1|   12|
|2012-01-12 00:00:00|      JE|    2|   18|
+-------------------+--------+-----+-----+
only showing top 10 rows



Switch timestamp to DateTime

In [7]:
df = df.withColumn('DATE', df['DATE'].cast(DateType()))

Extracting month

In [8]:
df = df.withColumn('month', F.month(df.DATE))
df.show()

+----------+--------+-----+-----+-----+
|      DATE|WEEK_DAY|LIGHT|METEO|month|
+----------+--------+-----+-----+-----+
|2012-02-01|      ME|    1|   11|    2|
|2012-06-28|      JE|    1|   11|    6|
|2012-07-11|      ME|    3|   11|    7|
|2012-01-03|      MA|    1|   11|    1|
|2012-01-05|      JE|    3|   11|    1|
|2012-01-05|      JE|    3|   17|    1|
|2012-01-07|      SA|    1|   11|    1|
|2012-01-09|      LU|    1|   12|    1|
|2012-01-11|      ME|    1|   12|    1|
|2012-01-12|      JE|    2|   18|    1|
|2012-01-12|      JE|    3|   18|    1|
|2012-01-13|      VE|    3|   12|    1|
|2012-01-13|      VE|    1|   17|    1|
|2012-01-14|      SA|    3|   12|    1|
|2012-01-14|      SA|    1|   11|    1|
|2012-01-15|      DI|    2|   11|    1|
|2012-01-15|      DI|    2|   11|    1|
|2012-01-18|      ME|    1|   11|    1|
|2012-01-16|      LU|    1|   11|    1|
|2012-01-18|      ME|    3|   11|    1|
+----------+--------+-----+-----+-----+
only showing top 20 rows



Keeping the records where meteo is precised

In [10]:
print(df.count())
df = df.where(df.METEO != 99)
print(df.count())

128143
128143


In [11]:
df.groupby(['date']).count().sort('date', accending=True).show()

+----------+-----+
|      date|count|
+----------+-----+
|2012-01-01|   39|
|2012-01-02|   24|
|2012-01-03|   40|
|2012-01-04|   43|
|2012-01-05|   59|
|2012-01-06|   40|
|2012-01-07|   45|
|2012-01-08|   35|
|2012-01-09|   42|
|2012-01-10|   47|
|2012-01-11|   48|
|2012-01-12|   78|
|2012-01-13|  103|
|2012-01-14|   95|
|2012-01-15|   67|
|2012-01-16|   76|
|2012-01-17|  111|
|2012-01-18|  109|
|2012-01-19|   90|
|2012-01-20|   65|
+----------+-----+
only showing top 20 rows



### Dividing each day into two: day time and night time

- Will first group codes 1 and 2 into DAY (code 1) and 3 and 4 into NIGHT (code 2) 


In [12]:
df.groupby(['date', 'light']).count().sort('date', accending=True).show()

+----------+-----+-----+
|      date|light|count|
+----------+-----+-----+
|2012-01-01|    1|    6|
|2012-01-01|    2|    1|
|2012-01-01|    3|   32|
|2012-01-02|    3|   12|
|2012-01-02|    1|   12|
|2012-01-03|    2|    1|
|2012-01-03|    3|   15|
|2012-01-03|    1|   23|
|2012-01-03|    4|    1|
|2012-01-04|    3|   15|
|2012-01-04|    1|   25|
|2012-01-04|    2|    3|
|2012-01-05|    2|    5|
|2012-01-05|    1|   35|
|2012-01-05|    4|    1|
|2012-01-05|    3|   18|
|2012-01-06|    2|    2|
|2012-01-06|    3|   12|
|2012-01-06|    1|   26|
|2012-01-07|    3|   16|
+----------+-----+-----+
only showing top 20 rows



In [13]:
# row:  DATE|WEEK_DAY|LIGHT|METEO|month
#result: "date", "week_day", "light", "meteo", "month"]

def assign_light(row):
    result = [row[0], row[1], 0, row[3], row[4]]
    day = [1, 2]
    night = [3, 4]
    if row[2] in day:
        result[2] = 1
        result[0] = str(result[0]) + '-d'
    else:
        result[2] = 2
        result[0] = str(result[0]) + '-n'
    return result

In [14]:
%%time

df = df.rdd.map(assign_light).toDF(["date", "week_day", "light", "meteo", "month"])
df.show()

+------------+--------+-----+-----+-----+
|        date|week_day|light|meteo|month|
+------------+--------+-----+-----+-----+
|2012-02-01-d|      ME|    1|   11|    2|
|2012-06-28-d|      JE|    1|   11|    6|
|2012-07-11-n|      ME|    2|   11|    7|
|2012-01-03-d|      MA|    1|   11|    1|
|2012-01-05-n|      JE|    2|   11|    1|
|2012-01-05-n|      JE|    2|   17|    1|
|2012-01-07-d|      SA|    1|   11|    1|
|2012-01-09-d|      LU|    1|   12|    1|
|2012-01-11-d|      ME|    1|   12|    1|
|2012-01-12-d|      JE|    1|   18|    1|
|2012-01-12-n|      JE|    2|   18|    1|
|2012-01-13-n|      VE|    2|   12|    1|
|2012-01-13-d|      VE|    1|   17|    1|
|2012-01-14-n|      SA|    2|   12|    1|
|2012-01-14-d|      SA|    1|   11|    1|
|2012-01-15-d|      DI|    1|   11|    1|
|2012-01-15-d|      DI|    1|   11|    1|
|2012-01-18-d|      ME|    1|   11|    1|
|2012-01-16-d|      LU|    1|   11|    1|
|2012-01-18-n|      ME|    2|   11|    1|
+------------+--------+-----+-----

In [15]:
df.sort('date', accending=True).show(30)

+------------+--------+-----+-----+-----+
|        date|week_day|light|meteo|month|
+------------+--------+-----+-----+-----+
|2012-01-01-d|      DI|    1|   12|    1|
|2012-01-01-d|      DI|    1|   11|    1|
|2012-01-01-d|      DI|    1|   12|    1|
|2012-01-01-d|      DI|    1|   11|    1|
|2012-01-01-d|      DI|    1|   12|    1|
|2012-01-01-d|      DI|    1|   12|    1|
|2012-01-01-d|      DI|    1|   13|    1|
|2012-01-01-n|      DI|    2|   14|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   11|    1|
|2012-01-01-n|      DI|    2|   17|    1|
|2012-01-01-n|      DI|    2|   14|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   14|    1|
|2012-01-01-n|      DI|    2|   14|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   11|    1|
|2012-01-01-n|      DI|    2|   12|    1|
|2012-01-01-n|      DI|    2|   14

### Forming a prediction dataframe

We will have folowing dataframes:

- _dates_ - date and number of accidents
- _df_pred_ - temporary dataframe that will be used in prediction
- _df_p_ - final subset

In [16]:
dates = df.groupby('date', 'light').count().sort('date', accending=True)
dates.show(10)

+------------+-----+-----+
|        date|light|count|
+------------+-----+-----+
|2012-01-01-d|    1|    7|
|2012-01-01-n|    2|   32|
|2012-01-02-d|    1|   12|
|2012-01-02-n|    2|   12|
|2012-01-03-d|    1|   24|
|2012-01-03-n|    2|   16|
|2012-01-04-d|    1|   28|
|2012-01-04-n|    2|   15|
|2012-01-05-d|    1|   40|
|2012-01-05-n|    2|   19|
+------------+-----+-----+
only showing top 10 rows



In [17]:
d_list = list(dates.select('date').toPandas()['date'])              # create a list of unique dates
c_list = list(dates.select('count').toPandas()['count'])            # create a list of the number of collision for each day

### Dealing with METEO

Adding Meteo columns:

-  splitting each code into one column with number of accidents corresponding each code
-  grouping them into fewer columns
-  

In [18]:
df_meteo = df.select('date', 'light', 'week_day', 'month', 'meteo').groupby('date', 'light', 'week_day', 'month', 'meteo').count().sort('date', accending=True)  
df_meteo.show(50)

+------------+-----+--------+-----+-----+-----+
|        date|light|week_day|month|meteo|count|
+------------+-----+--------+-----+-----+-----+
|2012-01-01-d|    1|      DI|    1|   13|    1|
|2012-01-01-d|    1|      DI|    1|   12|    4|
|2012-01-01-d|    1|      DI|    1|   11|    2|
|2012-01-01-n|    2|      DI|    1|   17|    2|
|2012-01-01-n|    2|      DI|    1|   14|    8|
|2012-01-01-n|    2|      DI|    1|   12|   12|
|2012-01-01-n|    2|      DI|    1|   11|   10|
|2012-01-02-d|    1|      LU|    1|   12|    3|
|2012-01-02-d|    1|      LU|    1|   11|    9|
|2012-01-02-n|    2|      LU|    1|   14|    2|
|2012-01-02-n|    2|      LU|    1|   12|    3|
|2012-01-02-n|    2|      LU|    1|   11|    7|
|2012-01-03-d|    1|      MA|    1|   12|    1|
|2012-01-03-d|    1|      MA|    1|   17|    1|
|2012-01-03-d|    1|      MA|    1|   11|   22|
|2012-01-03-n|    2|      MA|    1|   11|   15|
|2012-01-03-n|    2|      MA|    1|   12|    1|
|2012-01-04-d|    1|      ME|    1|   12

Dividing dates list into 3 parts in order to process faster

In [19]:
def assign_light(row):
    # row structure: ['date', 'light', 'count', 'meteo', 'week_day', 'month']
    # result structure: ["day", "night", 'DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA', "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "m_11","m_12", "m_13", "m_14", "m_15", "m_16", "m_17", "m_18", "m_19"]
    result = [0 for i in range(30)]
    
    # meteo:
    qty = row[2]                                                    #number of accidents with this meteo code (column count)
    labels = [11,12,13,14,15,16,17,18,19]
    for k in range(len(labels)):
        if row[3] == labels[k]:
            result[k+21] = qty
    
    # light:
    day = 1
    night = 2
    if row[1] == day:
        result[0] = 1
    else:
        result[1] = 1
        
    # week day
    labels = ['DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA']
    for k in range(len(labels)):
        if row[4] == labels[k]:
            result[k+2] = 1
            
    # month
    labels = [i for i in range(1,13)]
    for k in range(len(labels)):
        if row[5] == labels[k]:
            result[k+9] = 1
            
    return result


columns = ['date', 'count', "day", "night", 'DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA', "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "m-11","m-12", "m-13","m-14", "m-15", "m-16", "m-17", "m-18", "m-19"]   # Setting uo the columns for prediction dataframe
values = [d_list[0]]                                                # Creating temporary row with the first element as a date and the rest as an int
for i in range(len(columns)-1):
    values.append(0)
df_pred = spark.createDataFrame([values],columns)                   # Creating dataframe
df_pred.show()

+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|        date|count|day|night| DI| LU| MA| ME| JE| VE| SA|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12|m-11|m-12|m-13|m-14|m-15|m-16|m-17|m-18|m-19|
+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|2012-01-01-d|    0|  0|    0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+



### Working on the data in 4 batches: running cycle with differents __i__ values. Each part requires restarting kernel. Results are written into temp files.

In [20]:
%%time
# part-1
    
# for i in range(0, 1500):                                 # go through the array of dates
for i in range(1500, 3000): 
# for i in range(3500, 4000): 
# for i in range(4500, len(d_list)): 
    final_list = [d_list[i], c_list[i]]                           # Forming the row starting by date and number of accidents
    d = df_meteo.select('date', 'light', 'count', 'meteo', 'week_day', 'month').filter(df_meteo.date == d_list[i])  # getting accidents for specific date
    dd = d.rdd.map(assign_light).toDF(["day", "night", 'DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA', "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "m_11","m_12", "m_13", "m_14", "m_15", "m_16", "m_17", "m_18", "m_19"])       # dataframe with METEO codes as columns and number of accidents as values
    dd = dd.toPandas()

    for i in range(21):
        if type(dd.iloc[0,i]) == 'string':
            final_list.append(dd.iloc[0,i])
        else:
            final_list.append(int(dd.iloc[0,i]))

    for c in range(21, len(dd.columns)):                         # Reducing dataframe by calculating sum of accidents dor each METEO code for one day
        s = int(dd.iloc[:,c].sum())
        final_list.append(s)
    newRow = spark.createDataFrame([final_list], columns)
    df_pred = df_pred.union(newRow)                   # Adding a row into prediction dataset

df_pred = df_pred.filter(df_pred['count'] != 0) # Removing temporary row
df_pred.show(10)

+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|        date|count|day|night| DI| LU| MA| ME| JE| VE| SA|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12|m-11|m-12|m-13|m-14|m-15|m-16|m-17|m-18|m-19|
+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|2014-01-20-d|   37|  1|    0|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  21|  14|   0|   0|   0|   0|   1|   1|   0|
|2014-01-20-n|   12|  0|    1|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   6|   3|   0|   0|   0|   0|   3|   0|   0|
|2014-01-21-d|   54|  1|    0|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  54|   0|   0|   0|   0|   0|   0|   0|   0|
|2014-01-21-n|   14|  0|    1|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  

### Writing temp file. Don't forget to choose the line corresponding to the part number!

In [None]:
# df_pred.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/pred_1.csv', header="true")
# df_pred.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/pred_2.csv', header="true")
# df_pred.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/pred_3.csv', header="true")
# df_pred.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/pred_4.csv', header="true")

# df_pred.repartition(1).write.csv(path='/data/pred_1.csv', header="true")
df_pred.repartition(1).write.csv(path='/data/pred_2.csv', header="true")
# df_pred.repartition(1).write.csv(path='/data/pred_3.csv', header="true")
# df_pred.repartition(1).write.csv(path='/pred_4.csv', header="true")

In [21]:
df_pred.count()

1500

### Creating meteo-features: grouping codes into 4 groups and assigning 1 to the one that gets the most amount of collisions

In [22]:
%%time

def meteo(row):
    # row structure: ['date', 'count', 'day', 'night', 'DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', 'm-11', 'm-12', 'm-13', 'm-14', 'm-15', 'm-16', 'm-17', 'm-18', 'm-19']
#     result = [row[i] for i in range(23)]
    
    def meteo(a,b,c):
        n = 0
        if a != 0: 
            n = n + 1
        if b != 0:
            n = n + 1
        if c != 0:
            n = n + 1

        if n != 0:
            return((a + b + c) // n)
        else:
            return(a + b + c)

    meteo_list = [0 for i in range(4)]
    meteo_list[0] = meteo(row[0], row[1], 0)        # m_11 and m_12 to one group with average qty in not 0 (normal)
    meteo_list[1] = meteo(row[2], row[3], row[4])  # m_13, m_14 and m_15 (rain)
    meteo_list[2] = meteo(row[5], row[6], row[7])  # m_16, m_17 and m_18 (snow)
    meteo_list[3] = int(row[8])                           # m_19 not grouped (ice)
    m = pd.Series(meteo_list).max()
    
    result = [0 if i != m else 1 for i in meteo_list]
    
#     for i in meteo_list:
#         result.append(i)
    return result

print(df_pred.columns)
ddd = df_pred.select('m-11', 'm-12', 'm-13', 'm-14', 'm-15', 'm-16', 'm-17', 'm-18', 'm-19')
df_add = ddd.rdd.map(meteo).toDF(['11-12', '13-14-15', '16-17-18', '19'])


['date', 'count', 'day', 'night', 'DI', 'LU', 'MA', 'ME', 'JE', 'VE', 'SA', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', 'm-11', 'm-12', 'm-13', 'm-14', 'm-15', 'm-16', 'm-17', 'm-18', 'm-19']
CPU times: user 40 ms, sys: 0 ns, total: 40 ms
Wall time: 26.6 s


In [23]:
df_pred.show()

+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|        date|count|day|night| DI| LU| MA| ME| JE| VE| SA|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12|m-11|m-12|m-13|m-14|m-15|m-16|m-17|m-18|m-19|
+------------+-----+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+
|2014-01-20-d|   37|  1|    0|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  21|  14|   0|   0|   0|   0|   1|   1|   0|
|2014-01-20-n|   12|  0|    1|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   6|   3|   0|   0|   0|   0|   3|   0|   0|
|2014-01-21-d|   54|  1|    0|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  54|   0|   0|   0|   0|   0|   0|   0|   0|
|2014-01-21-n|   14|  0|    1|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  

In [24]:
df_add.show()

+-----+--------+--------+---+
|11-12|13-14-15|16-17-18| 19|
+-----+--------+--------+---+
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    0|       0|       1|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    0|       0|       1|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
|    1|       0|       0|  0|
+-----+--------+--------+---+
only showing top 20 rows



Adding indexes to both of the temp datasets to be able to merge them

In [25]:
from pyspark.sql.functions import monotonically_increasing_id 

df_pred = df_pred.select("*").withColumn("index", monotonically_increasing_id())
df_add = df_add.select("*").withColumn("index", monotonically_increasing_id())

In [None]:
df_pred.count()

In [None]:
df_add.count()

In [None]:
df_p = df_pred.join(df_add, df_pred.index == df_add.index)
df_p.show()

In [None]:
df_p = df_p.drop('m-11', 'm-12', 'm-13', 'm-14', 'm-15', 'm-16', 'm-17', 'm-18', 'm-19')
df_p = df_p.drop(df_add['index'])
df_p = df_p.drop(df_pred['index']).sort('date', acsending=True)

In [None]:
df_p.show(20)

### Writing the final file. Don't forget to choose the line corresponding to the part number!

In [None]:
# df_p.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/final_part_1.csv', header="true")
# df_p.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/final_part_2.csv', header="true")
# df_p.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/final_part_3.csv', header="true")
# df_p.repartition(1).write.csv(path='gs://jupyter-empty-global/notebooks/jupyter/data/final_part_4.csv', header="true")

# df_p.repartition(1).write.csv(path='/data/final_part_1.csv', header="true")
df_p.repartition(1).write.csv(path='/data/final_part_2.csv', header="true")
# df_p.repartition(1).write.csv(path='/data/final_part_3.csv', header="true")
# df_p.repartition(1).write.csv(path='/data/final_part_4.csv', header="true")

### Joining all the parts together

In [7]:
# %%time

# df_pred_1 = spark.read.csv('data/final_part_1.csv', header='true', inferSchema = True)
# df_pred_2 = spark.read.csv('data/final_part_2.csv', header='true', inferSchema = True)
# df_pred_3 = spark.read.csv('data/final_part_3.csv', header='true', inferSchema = True)
# df_pred_4 = spark.read.csv('data/final_part_4.csv', header='true', inferSchema = True)

# from functools import reduce  # For Python 3.x
# from pyspark.sql import DataFrame

# def unionAll(*dfs):
#     return reduce(DataFrame.unionAll, dfs)

# df_all = unionAll(df_pred_1, df_pred_2, df_pred_3, df_pred_4)

In [8]:
# df_all = df_pred_1.union(df_pred_2).union(df_pred_3).union(df_pred_4)
# df_all.count()
# df_all.show(10)

In [9]:
# df_all.repartition(1).write.csv(path='/data/final_all.csv', header="true")

In [11]:
# sc.stop()