In [70]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import fastdtw
from scipy.spatial.distance import euclidean

In [37]:
data = sc.textFile('household_power_consumption.txt')

In [38]:
header = data.first().split(';')

In [39]:
data   = data.map(lambda x: x.split(';')).filter(lambda x: x!=header)

In [95]:
df = data.toDF(header)

In [96]:
df_pandads = df.limit(df.count()).toPandas()

In [97]:
df_pandads.isnull().sum()

Date                     0
Time                     0
Global_active_power      0
Global_reactive_power    0
Voltage                  0
Global_intensity         0
Sub_metering_1           0
Sub_metering_2           0
Sub_metering_3           0
dtype: int64

In [62]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Global_active_power: string (nullable = true)
 |-- Global_reactive_power: string (nullable = true)
 |-- Voltage: string (nullable = true)
 |-- Global_intensity: string (nullable = true)
 |-- Sub_metering_1: string (nullable = true)
 |-- Sub_metering_2: string (nullable = true)
 |-- Sub_metering_3: string (nullable = true)



In [48]:
df1 = df.withColumn('Sub_metering_1',df.Sub_metering_1.cast('float')) \
    .withColumn('Sub_metering_2',df.Sub_metering_2.cast('float')) \
    .withColumn('Sub_metering_3',df.Sub_metering_3.cast('float')) \
    .withColumn('Voltage',df.Voltage.cast('float'))

In [65]:
df2 = df1.groupby("Date").agg(F.collect_list("Sub_metering_1").alias('meter1'),
                              F.collect_list("Sub_metering_2").alias('meter2'),
                              F.collect_list("Sub_metering_3").alias('meter3'))

In [72]:
df2.printSchema()

root
 |-- Date: string (nullable = true)
 |-- meter1: array (nullable = true)
 |    |-- element: float (containsNull = false)
 |-- meter2: array (nullable = true)
 |    |-- element: float (containsNull = false)
 |-- meter3: array (nullable = true)
 |    |-- element: float (containsNull = false)



In [66]:
df2.limit(2).toPandas()

Unnamed: 0,Date,meter1,meter2,meter3
0,13/2/2007,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,19/11/2007,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [75]:
def get_udf_distance(array1, array2):
    distance, path = fastdtw(array1, array2, dist=euclidean)
    return distance
udf_dtw = udf(get_udf_distance , FloatType())

In [83]:
df3 = df2.select('Date', udf_dtw(df2.meter1, df2.meter2).alias('dtw_distance'))