In [1]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession.builder \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [5]:
df = spark.read.parquet('washing.parquet')
df.createOrReplaceTempView("washing")

In [6]:
result = spark.sql("""
SELECT * from (
    SELECT
    min(temperature) over w as min_temperature,
    max(temperature) over w as max_temperature, 
    min(voltage) over w as min_voltage,
    max(voltage) over w as max_voltage,
    min(flowrate) over w as min_flowrate,
    max(flowrate) over w as max_flowrate,
    min(frequency) over w as min_frequency,
    max(frequency) over w as max_frequency,
    min(hardness) over w as min_hardness,
    max(hardness) over w as max_hardness,
    min(speed) over w as min_speed,
    max(speed) over w as max_speed
    FROM washing 
    WINDOW w AS (ORDER BY ts ROWS BETWEEN CURRENT ROW AND 10 FOLLOWING) 
)
WHERE min_temperature is not null 
AND max_temperature is not null
AND min_voltage is not null
AND max_voltage is not null
AND min_flowrate is not null
AND max_flowrate is not null
AND min_frequency is not null
AND max_frequency is not null
AND min_hardness is not null
AND min_speed is not null
AND max_speed is not null   
""")

In [7]:
df.count()-result.count()

7

In [8]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [9]:
assembler = VectorAssembler(inputCols=result.columns, outputCol="features")

In [10]:
features = assembler.transform(result)

In [12]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features)

In [13]:
result_pca = model.transform(features).select("pcaFeatures")
result_pca.show(truncate=False)

+----------------------------------------------------------+
|pcaFeatures                                               |
+----------------------------------------------------------+
|[1459.9789705815301,18.7452377818137,70.78430794789449]   |
|[1459.9954818287874,19.113431461685494,70.72738871418574] |
|[1460.0895843562396,20.96947106295569,70.75630600314616]  |
|[1469.69939294206,20.4031246476478,62.01356967480217]     |
|[1469.7159041893176,20.771318327519595,61.956650441093416]|
|[1469.7128317339768,20.7907511172548,61.89610667824968]   |
|[1478.353026457406,20.294557029761734,71.67550104802774]  |
|[1478.353026457406,20.294557029761734,71.67550104802774]  |
|[1478.3686036139297,20.260626897669283,71.63355353599673] |
|[1478.3686036139297,20.260626897669283,71.63355353599673] |
|[1483.5412027685188,20.006222577534125,66.82710394276796] |
|[1483.5171090224458,20.867020421616665,66.86707301946433] |
|[1483.4224268544053,19.875748236687734,66.93027077907936] |
|[1483.4224268544053,19.

In [14]:
result_pca.count()

2051

In [15]:
rdd = result_pca.rdd.sample(False,0.8)

In [17]:
#because of memory issue cant continue