# Milestone 1 without SparkSQL - Group 18

In [1]:
import pandas as pd

import findspark
findspark.init()
from pyspark.sql import functions as sf
from pyspark.sql.functions import when, col, trim, to_timestamp
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import lead, lag
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
import matplotlib.pyplot as plt
import matplotlib as mp
import numpy as np
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Loading the Data

In [2]:
raw_df = spark.read.option("header",True) \
    .csv("../MS1.csv",inferSchema=True).toDF(
  "Dummy", "Date", "Price", "Volume"
)
print('Number of rows in raw data:', raw_df.count())

raw_df.printSchema()

Number of rows in raw data: 38374197
root
 |-- Dummy: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Volume: string (nullable = true)



# View the Data

In [3]:
raw_df.show()

+--------------------+----------+-----+------+
|               Dummy|      Date|Price|Volume|
+--------------------+----------+-----+------+
|32843.Nordamerika...|01/04/2016|18.52| 51616|
|32843.Nordamerika...|01/05/2016|19.15| 54898|
|32843.Nordamerika...|01/06/2016|19.71| 41555|
|32843.Nordamerika...|01/07/2016|19.17| 44430|
|32843.Nordamerika...|01/08/2016|18.94| 72673|
|32843.Nordamerika...|01/11/2016| 19.1| 45426|
|32843.Nordamerika...|01/12/2016|19.39| 61457|
|32843.Nordamerika...|01/13/2016|19.27| 61805|
|32843.Nordamerika...|01/14/2016|19.17| 35597|
|32843.Nordamerika...|01/15/2016|18.81| 69227|
|32843.Nordamerika...|01/18/2016|18.81| 69227|
|32843.Nordamerika...|01/19/2016|18.82| 23700|
|32843.Nordamerika...|01/20/2016|17.97| 41439|
|32843.Nordamerika...|01/21/2016|17.82| 35240|
|32843.Nordamerika...|01/22/2016|18.62| 79139|
|32843.Nordamerika...|01/25/2016|18.09| 89251|
|32843.Nordamerika...|01/26/2016|17.62| 75300|
|32843.Nordamerika...|01/27/2016|17.58| 43142|
|32843.Nordam

# Data Pre-processing

In [4]:
from pyspark.sql.functions import split
import pyspark.sql.functions as f
raw_df=raw_df.withColumn("Number", split(col("Dummy"), "[.]").getItem(0)).withColumn("Name", split(col("Dummy"), "[.]").getItem(1))

In [5]:
raw_df=raw_df.drop(raw_df.Dummy)


In [6]:
raw_df.show(100)
raw_df.count()

+----------+-------+------+------+--------------------+
|      Date|  Price|Volume|Number|                Name|
+----------+-------+------+------+--------------------+
|01/04/2016|  18.52| 51616| 32843|Nordamerika_USA-N...|
|01/05/2016|  19.15| 54898| 32843|Nordamerika_USA-N...|
|01/06/2016|  19.71| 41555| 32843|Nordamerika_USA-N...|
|01/07/2016|  19.17| 44430| 32843|Nordamerika_USA-N...|
|01/08/2016|  18.94| 72673| 32843|Nordamerika_USA-N...|
|01/11/2016|   19.1| 45426| 32843|Nordamerika_USA-N...|
|01/12/2016|  19.39| 61457| 32843|Nordamerika_USA-N...|
|01/13/2016|  19.27| 61805| 32843|Nordamerika_USA-N...|
|01/14/2016|  19.17| 35597| 32843|Nordamerika_USA-N...|
|01/15/2016|  18.81| 69227| 32843|Nordamerika_USA-N...|
|01/18/2016|  18.81| 69227| 32843|Nordamerika_USA-N...|
|01/19/2016|  18.82| 23700| 32843|Nordamerika_USA-N...|
|01/20/2016|  17.97| 41439| 32843|Nordamerika_USA-N...|
|01/21/2016|  17.82| 35240| 32843|Nordamerika_USA-N...|
|01/22/2016|  18.62| 79139| 32843|Nordamerika_US

38374197

In [38]:
raw_df
X_df = raw_df.select("Date", "Price")
X_df = X_df.sort("Date").limit(100)

In [39]:

X_df_array = np.array(X_df.select("Price").collect())
X_df_array = np.concatenate( X_df_array, axis=0 )

In [8]:
#delete missing values
raw_df=raw_df.na.drop()
# raw_df.count()

# Loading Time Series Data

In [9]:
time_df = spark.read.option("header",True) \
    .csv("../parking.csv")
print('Number of rows in raw data:', time_df.count())
time_df.show(10)
time_df.printSchema()

Number of rows in raw data: 11809233
+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-------------+-------------------+--------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+----+----+----+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street

In [10]:
#dropping the last three columns
col_drop=["BIN","BBL","NTA"]
for col in col_drop:
    time_df=time_df. drop(col)

#printing new schema
time_df.printSchema()
# time_df.count()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County: str

In [11]:
time_df = time_df.withColumnRenamed("Issue Date", "Date")
time_df.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County: string (n

# Merging time series dataset with stocks dataset 

In [12]:
df = raw_df.join(time_df,['Date'],how='inner').distinct()
df.show(10)

+----------+-------+-------+------+--------------------+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+------------+--------------------+--------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+
|      Date|  Price| Volume|Number|                Name|Summons Number|Plate ID|Registration State|Plate Type|Violation Code|Vehicle Body Typ

In [13]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: strin

In [14]:
df.na.drop(how="any").show(truncate=False)

+----+-----+------+------+----+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+
|Date|Price|Volume|Number|Name|Summons Number|Plate ID|Registration State|Plate Type|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Cod

In [15]:
df.show(10)

+----------+-------+-------+------+--------------------+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+------------+--------------------+--------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+
|      Date|  Price| Volume|Number|                Name|Summons Number|Plate ID|Registration State|Plate Type|Violation Code|Vehicle Body Typ

In [16]:
#df.count() # Result = 60710470

In [58]:
import numpy as np
from timeit import default_timer as timer

def aggregate_avg(Y1, Y2):
    start = timer()
    Y = []
    
    if len(Y1) != len (Y2):
        print("Arrays have different lengths!")
    else:
        for i in range(len(Y1)):
            Y.append(np.mean([Y1[i], Y2[i]]))
    end = timer()
    print("Time Agg avg: ", end - start)
    return Y

def aggregate_min(Y1, Y2):
    start = timer()
    Y = []
    
    if len(Y1) != len (Y2):
        print("Arrays have different lengths!")
    else:
        Y = np.minimum(Y1, Y2)
    end = timer()
    print("Time Agg min: ", end - start)
    return Y

def aggregate_max(Y1, Y2):
    start = timer()
    Y = []
    
    if len(Y1) != len (Y2):
        print("Arrays have different lengths!")
    else:
        Y = np.maximum(Y1, Y2)
    end = timer()
    print("Time Agg max: ", end - start)
    return Y

def sum_of_multiplications(X, Y):
    sum_XY = 0
    if len(X) != len (Y):
        print("Arrays have different lengths!")
    else:
        for i in range(len(X)):
            mult = X[i] * Y[i]
            sum_XY = sum_XY + mult
    return sum_XY

def is_similar(a, b, tau):
    if a < b:
        x = a
        y = b
    else:
        y = a
        x = b
    x_squared = x**2
    y_squared = y**2
    simi_score = x_squared / math.sqrt(x_squared * y_squared)
    if simi_score > tau:
        return True
    else: 
        return False

        

In [86]:
import math
# X = [3, 1, 4, 2, 3, 1, 4, 2, 3, 1, 4, 2, 3, 1, 4, 2, 3, 1, 4, 2, 3, 1, 4, 2]
# Y1 = [1, 2, 1, 6, 1, 2, 1, 6, 1, 2, 1, 6, 1, 2, 1, 6, 1, 2, 1, 6, 1, 2, 1, 6]
# Y2 = [3, 4, 1, 0, 3, 4, 1, 0, 3, 4, 1, 0, 3, 4, 1, 0, 3, 4, 1, 0, 3, 4, 1, 0]
X = X_df_array
Y1 = np.random.randint(5, 10, 100)
Y2 = np.random.randint(5, 10, 100)
Y_prime = aggregate_avg(Y1, Y2)

sum_X_YY = sum_of_multiplications(X, Y_prime)
sum_X_squared = sum_of_multiplications(X, X)
sum_Y_squared = sum_of_multiplications(Y_prime, Y_prime)

cos_X_YY = sum_X_YY / (math.sqrt(sum_X_squared) * math.sqrt(sum_Y_squared))

cos_X_YY

Time Agg avg:  0.001473199999963981


0.3100577357662088

In [87]:
threshold = 0.1
tau_avg = cos_X_YY + threshold
tau_min = cos_X_YY + threshold
tau_max = cos_X_YY + threshold

In [88]:
X_results = []
Y_results = []

for i in range(len(X)):
    if is_similar(X[i], Y_prime[i], tau_avg):
        X_results.append(X[i])
        Y_results.append(Y_prime[i])
        
    if len(X_results) > 20:
        print("More than 20 results have been found")
        break;

X_results, Y_results


More than 20 results have been found


([15.09,
  10.11,
  11.3,
  11.45,
  11.76,
  18.05,
  18.78,
  19.09,
  7.13,
  15.08,
  10.35,
  6.04,
  7.055,
  7.24875,
  3.822,
  6.19,
  8.04,
  7.3,
  4.6,
  12.19,
  10.64],
 [8.0,
  5.5,
  7.5,
  7.5,
  7.0,
  8.0,
  8.0,
  8.0,
  8.5,
  6.5,
  7.0,
  9.0,
  7.5,
  6.5,
  7.0,
  8.0,
  8.5,
  5.5,
  8.5,
  9.0,
  7.5])