In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Continuous_data').getOrCreate()

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window, WindowSpec

In [3]:
import pandas as pd
pd.options.mode.chained_assignment = None
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns
#sns.set()

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

from sklearn.linear_model import LinearRegression

In [4]:
### Faire un left join pour tous les param et garder les nan -> ne pas sauter de frame
df = pd.read_csv('PdM_telemetry_exemple.csv.gz')
df.head()

Unnamed: 0,Machine_sn,f_rank,rotate_1,rotate_2,pressure_1,pressure_2,volt,vibration
0,1001,1,418.504078,424.624162,113.077935,76.005332,176.217853,45.087686
1,1001,2,402.74749,432.37296,95.460525,110.907806,162.879223,43.413973
2,1001,3,527.349825,454.629639,75.237905,97.877007,170.989902,34.178847
3,1001,4,346.149335,438.391022,109.248561,84.44043,162.462833,41.122144
4,1001,5,435.376873,473.055664,111.886648,110.395683,157.610021,25.990511


In [5]:
df_all = spark.createDataFrame(df[df.Machine_sn <= 1003])

In [6]:
pattern = pd.read_csv('PDM_pattern.csv')[0:15]
pattern

Unnamed: 0,d_rotate,d_presure
0,-44.572027,-15.981047
1,18.724784,1.739566
2,-93.753056,28.492219
3,-36.438712,-9.622047
4,73.548703,13.404613
5,39.6959,5.608549
6,16.942812,-9.421854
7,46.334792,-11.42056
8,-9.174435,21.723454
9,24.001826,-3.222246


In [7]:
df_all = df_all.withColumn('d_rotate_1',F.col('rotate_1')-F.col('rotate_2'))
df_all = df_all.withColumn('d_rotate_2',F.col('rotate_2')-F.col('rotate_1'))
df_all = df_all.withColumn('d_pressure_1',F.col('pressure_1')-F.col('pressure_2'))
df_all = df_all.withColumn('d_pressure_2',F.col('pressure_2')-F.col('pressure_1'))

In [8]:
pattern_len = 15
windowSpec = Window.partitionBy('Machine_sn').orderBy('f_rank').rangeBetween(0,pattern_len-1)

df_all = df_all.withColumn('d_rotate_1_seq',F.collect_list(F.col('d_rotate_1')).over(windowSpec))
df_all = df_all.withColumn('d_rotate_2_seq',F.collect_list(F.col('d_rotate_2')).over(windowSpec))
df_all = df_all.withColumn('d_pressure_1_seq',F.collect_list(F.col('d_pressure_1')).over(windowSpec))
df_all = df_all.withColumn('d_pressure_2_seq',F.collect_list(F.col('d_pressure_2')).over(windowSpec))

In [9]:
df_all = df_all.filter(F.size(F.col('d_rotate_1_seq')) == pattern_len)

In [10]:
def autocor_spark(tr1,tp1):
    
    y = pattern.values
    X = np.array(np.transpose([tr1,tp1]))
    
    reg = LinearRegression().fit(X, y)
    
    return float(reg.score(X,y))

autocor_spark_udf = F.udf(autocor_spark, T.FloatType())

In [11]:
df_all = df_all.withColumn(
    'autocor_1',
    autocor_spark_udf(F.col('d_rotate_1_seq'),F.col('d_pressure_1_seq'))
)

df_all = df_all.withColumn(
    'autocor_2',
    autocor_spark_udf(F.col('d_rotate_2_seq'),F.col('d_pressure_2_seq'))
)

In [12]:
result = df_all.select(
    'Machine_sn','f_rank','rotate_1','rotate_2','pressure_1',
    'pressure_2','volt','vibration','autocor_1','autocor_2'
)

In [None]:
test = result.toPandas()

In [None]:
test