In [1]:
# batch processing demo - with spark kmean 
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
import os

findspark.init()
findspark.find()
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("DiseaseDataBatch") \
    .getOrCreate()


#     .config("spark.sql.shuffle.partitions", "1") \
print(datetime.now())

2020-04-22 17:56:47.765860


In [2]:
full_df = spark.read \
    .format("csv") \
    .option("comment", "#") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data.csv")
full_df.show(3)

+-------------------+--------+------+-----+
|               date|      kw|region|value|
+-------------------+--------+------+-----+
|2004-01-01 00:00:00|Headache|    IN|  0.0|
|2004-01-02 00:00:00|Headache|    IN|  0.0|
|2004-01-03 00:00:00|Headache|    IN|  0.0|
+-------------------+--------+------+-----+
only showing top 3 rows



In [3]:
full_df.show(3)

+-------------------+--------+------+-----+
|               date|      kw|region|value|
+-------------------+--------+------+-----+
|2004-01-01 00:00:00|Headache|    IN|  0.0|
|2004-01-02 00:00:00|Headache|    IN|  0.0|
|2004-01-03 00:00:00|Headache|    IN|  0.0|
+-------------------+--------+------+-----+
only showing top 3 rows



In [4]:
df = full_df.filter(full_df.region == "US") \
    .groupBy(full_df.date, "region") \
    .pivot("kw") \
    .agg(first("value")) \
    .sort("date") \
    .na.fill(0)

df.show(n=1,vertical=True)

-RECORD 0----------------------------------
 date                | 2004-01-01 00:00:00 
 region              | US                  
 Abdominal pain      | 0.0                 
 Common cold         | 72.0                
 Cough               | 94.0                
 Diarrhea            | 100.0               
 Dizziness           | 60.0                
 Fever               | 72.0                
 Headache            | 58.0                
 Influenza           | 100.0               
 Itch                | 81.0                
 Mucus               | 0.0                 
 Nausea              | 63.0                
 Pharyngitis         | 0.0                 
 Phlegm              | 0.0                 
 Pneumonia           | 19.0                
 Shortness of breath | 0.0                 
 Skin rash           | 49.0                
 Sneeze              | 98.0                
 Sore throat         | 37.0                
 Virus               | 39.0                
 Vomiting            | 47.0     

In [5]:
 #.filter(df.date > datetime(2019,4,1))
shrink_df = df.coalesce(8)

shrink_df.show(n=1,vertical=True)

-RECORD 0----------------------------------
 date                | 2004-01-01 00:00:00 
 region              | US                  
 Abdominal pain      | 0.0                 
 Common cold         | 72.0                
 Cough               | 94.0                
 Diarrhea            | 100.0               
 Dizziness           | 60.0                
 Fever               | 72.0                
 Headache            | 58.0                
 Influenza           | 100.0               
 Itch                | 81.0                
 Mucus               | 0.0                 
 Nausea              | 63.0                
 Pharyngitis         | 0.0                 
 Phlegm              | 0.0                 
 Pneumonia           | 19.0                
 Shortness of breath | 0.0                 
 Skin rash           | 49.0                
 Sneeze              | 98.0                
 Sore throat         | 37.0                
 Virus               | 39.0                
 Vomiting            | 47.0     

In [6]:
features_columns = shrink_df.drop("date","region").columns
print(features_columns)

['Abdominal pain', 'Common cold', 'Cough', 'Diarrhea', 'Dizziness', 'Fever', 'Headache', 'Influenza', 'Itch', 'Mucus', 'Nausea', 'Pharyngitis', 'Phlegm', 'Pneumonia', 'Shortness of breath', 'Skin rash', 'Sneeze', 'Sore throat', 'Virus', 'Vomiting']


In [7]:
# transform the features columns to a single feature vector 
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler()\
      .setInputCols(features_columns)\
      .setOutputCol("features")

data = assembler\
      .transform(shrink_df)\
      .select("features", "date") 

data.show(2)

+--------------------+-------------------+
|            features|               date|
+--------------------+-------------------+
|[0.0,72.0,94.0,10...|2004-01-01 00:00:00|
|[66.0,60.0,59.0,2...|2004-01-02 00:00:00|
+--------------------+-------------------+
only showing top 2 rows



In [8]:
# normlize data's featueres by z-score (std=1, mean=0)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scaler_model = scaler.fit(data)
scaled_data = scaler_model.transform(data).drop("features").withColumnRenamed('scaled_features','features')
scaled_data.show(2)

+--------------------+-------------------+--------------------+
|            features|               date|     scaled_features|
+--------------------+-------------------+--------------------+
|[0.0,72.0,94.0,10...|2004-01-01 00:00:00|[-2.7931557899775...|
|[66.0,60.0,59.0,2...|2004-01-02 00:00:00|[0.01874477129218...|
+--------------------+-------------------+--------------------+
only showing top 2 rows



In [15]:
# reduce dimantion by PCA feature transformer
from pyspark.ml.feature import PCA
pca = PCA(k=4, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(scaled_data)
pca_result = pca_model.transform(scaled_data).drop("features").withColumnRenamed('pca_features','features')
pca_result.show(2)

+-------------------+--------------------+
|               date|            features|
+-------------------+--------------------+
|2004-01-01 00:00:00|[2.75566526065209...|
|2004-01-02 00:00:00|[2.71959250860813...|
+-------------------+--------------------+
only showing top 2 rows



In [21]:
dataset = pca_result
dataset.show(2)

+-------------------+--------------------+
|               date|            features|
+-------------------+--------------------+
|2004-01-01 00:00:00|[2.75566526065209...|
|2004-01-02 00:00:00|[2.71959250860813...|
+-------------------+--------------------+
only showing top 2 rows



In [22]:
# select K (number of clusers) by silhouette's score
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

def select_k(dataset,k_range ,random_state=None,do_print=False):
    max_k = None
    max_avg_k = None
    evaluator = ClusteringEvaluator()
    for k in k_range:
        if do_print:
            print(f'clalculating k={k}')
        # Trains a k-means model.
        kmeans  = KMeans().setK(k).setSeed(1).fit(dataset)
        # Make predictions
        predictions = kmeans.transform(dataset)
        silhouette_avg =  evaluator.evaluate(predictions)
        if  max_k is None or silhouette_avg > max_avg_k:
            max_k , max_avg_k = k, silhouette_avg
            if do_print:
                print(f'max k={max_k}, silhouette_avg={max_avg_k}')
    return max_k 


k = select_k(dataset,range(2,7),do_print=True)
print(f'selected k:{k}')

clalculating k=2
max k=2, silhouette_avg=0.6560627400318632
clalculating k=3
max k=3, silhouette_avg=0.6761349279741743
clalculating k=4
clalculating k=5
clalculating k=6
selected k:3


In [23]:
# create a KMean clustering model and cluster the data ('featuers')
from pyspark.ml.clustering import KMeans

# Trains a k-means model.
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Shows the result.
centers = model.clusterCenters()
print(f'Cluster Centers: {centers} ')  
predictions.show(3)  

Cluster Centers: [array([-6.72408486, -1.53881906,  1.04642048, -0.09144765]), array([ 3.43041146, -0.5833448 ,  0.00767498, -0.00437638]), array([-1.91032674,  0.87734099, -0.22291329,  0.02304035])] 
+-------------------+--------------------+----------+
|               date|            features|prediction|
+-------------------+--------------------+----------+
|2004-01-01 00:00:00|[2.75566526065209...|         1|
|2004-01-02 00:00:00|[2.71959250860813...|         1|
|2004-01-03 00:00:00|[2.11668035480570...|         1|
+-------------------+--------------------+----------+
only showing top 3 rows



In [24]:
# calculate distance between each point to it's cluster
from pyspark.sql.types import *

def calculate_distance(features,prediction):
    center = centers[int(prediction)]
    return float(features.squared_distance(center))

udf_translate = udf(translate, ArrayType(DoubleType()))
udf_calculate_distance = udf(calculate_distance, DoubleType())


tmp = predictions.withColumn("distance",udf_calculate_distance('features','prediction'))
tmp.show(365)

+-------------------+--------------------+----------+------------------+
|               date|            features|prediction|          distance|
+-------------------+--------------------+----------+------------------+
|2004-01-01 00:00:00|[2.75566526065209...|         1|2.6389982314304037|
|2004-01-02 00:00:00|[2.71959250860813...|         1|20.750526196255652|
|2004-01-03 00:00:00|[2.11668035480570...|         1| 7.978713543689333|
|2004-01-04 00:00:00|[2.17759802588386...|         1| 7.426774791490905|
|2004-01-05 00:00:00|[2.29338148081628...|         1|15.475459105709806|
|2004-01-06 00:00:00|[4.43804729549920...|         1| 2.799904101127484|
|2004-01-07 00:00:00|[4.03751091269162...|         1| 3.889388483225952|
|2004-01-08 00:00:00|[4.53652448795289...|         1|3.2766370851183577|
|2004-01-09 00:00:00|[4.21302458297084...|         1|3.1880853636147117|
|2004-01-10 00:00:00|[6.08643807052067...|         1| 7.759247614713291|
|2004-01-11 00:00:00|[3.75854320038617...|         

In [25]:
# calculate statistics (distances' mean and std)
# unused (only for display)
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = tmp.select(
    _mean(col('distance')).alias('mean'),
    _stddev(col('distance')).alias('std')
).collect()[0]

distances_mean = df_stats['mean']
distances_std = df_stats['std']

print(f'distances mean:{distances_mean}, std:{distances_std}')

distances mean:5.892223045643053, std:33.443108374485014


In [33]:
# normlize the distances by z-score (mean=0, std=1)
diatances_assembler = VectorAssembler()\
      .setInputCols(['distance'])\
      .setOutputCol('distance_v')

diatances_data = diatances_assembler\
      .transform(tmp).drop("distance").withColumnRenamed('distance_v','distance')



diatances_scaler = StandardScaler(inputCol="distance", outputCol="scaled_distance",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
diatances_scaler_model = diatances_scaler.fit(diatances_data)
diatances_scaler_data = diatances_scaler_model.transform(diatances_data)\
    .drop("distance").withColumnRenamed('scaled_distance','distance')
diatances_scaler_data.show(50)

+-------------------+--------------------+----------+--------------------+
|               date|            features|prediction|            distance|
+-------------------+--------------------+----------+--------------------+
|2004-01-01 00:00:00|[2.75566526065209...|         1|[-0.0972763888387...|
|2004-01-02 00:00:00|[2.71959250860813...|         1|[0.4442859492674595]|
|2004-01-03 00:00:00|[2.11668035480570...|         1|[0.0623892514619885]|
|2004-01-04 00:00:00|[2.17759802588386...|         1|[0.04588544009320...|
|2004-01-05 00:00:00|[2.29338148081628...|         1|[0.2865533894982728]|
|2004-01-06 00:00:00|[4.43804729549920...|         1|[-0.0924650576701...|
|2004-01-07 00:00:00|[4.03751091269162...|         1|[-0.0598878112641...|
|2004-01-08 00:00:00|[4.53652448795289...|         1|[-0.0782100135919...|
|2004-01-09 00:00:00|[4.21302458297084...|         1|[-0.0808578452621...|
|2004-01-10 00:00:00|[6.08643807052067...|         1|[0.05582688511378...|
|2004-01-11 00:00:00|[3.7

In [34]:
# show distances that exceed the boundaries as outliers
# todo -> should very small disatances considered an outliers?
first_element=udf(lambda v:float(v[0]),DoubleType())
diatances_scaler_data.where((first_element('distance') > 2) | (first_element('distance')<-2 ))\
    .show(365)

+-------------------+--------------------+----------+--------------------+
|               date|            features|prediction|            distance|
+-------------------+--------------------+----------+--------------------+
|2020-03-09 00:00:00|[-12.101905466922...|         0|[2.9050237178837337]|
|2020-03-10 00:00:00|[-13.329292403489...|         0|[4.8176695614385014]|
|2020-03-11 00:00:00|[-14.240059796637...|         0| [7.270441795196232]|
|2020-03-12 00:00:00|[-19.928794448505...|         0|[24.649256598072814]|
|2020-03-13 00:00:00|[-20.323300103831...|         0|[23.275286686885934]|
|2020-03-14 00:00:00|[-21.471956937061...|         0|[20.319729230214545]|
|2020-03-15 00:00:00|[-21.185721197494...|         0| [20.00482072780182]|
|2020-03-16 00:00:00|[-20.246439696490...|         0| [21.42602768115987]|
|2020-03-17 00:00:00|[-20.098471292476...|         0|[20.945159388374325]|
|2020-03-18 00:00:00|[-20.257046648872...|         0| [19.84132264159401]|
|2020-03-19 00:00:00|[-20

In [28]:
print(datetime.now())

2020-04-22 18:49:02.335847


In [31]:
diatances_scaler_data.explain

<bound method DataFrame.explain of DataFrame[date: timestamp, features: vector, prediction: int, distance: double, distance_v: vector, scaled_distance: vector]>

In [37]:
diatances_scaler_data.withColumn("distance",first_element('distance')).drop('features')\
    .coalesce(1).write\
      .option("header","true")\
      .option("sep",",")\
      .mode("overwrite")\
      .csv("cc_out.csv")

In [38]:
diatances_scaler_data.explain

<bound method DataFrame.explain of DataFrame[date: timestamp, features: vector, prediction: int, distance: vector]>