# SQL и Pandas

In [1]:
import pandas as pd

In [2]:
data = pd.read_csv("/home/jovyan/work/data/gbc_repair_db.csv")

In [3]:
data.head()

Unnamed: 0,id,p00,p01,p02,p03,p04,p05,p06,p07,p08,...,p14,p15,p16,p17,p18,p19,p20,s1,s2,date
0,1,518.67,1.3,47.47,521.66,2388.02,8138.62,8.4195,0.03,392,...,1589.7,1400.6,14.62,21.61,554.36,2388.06,9046.19,-0.0007,-0.0004,2021-09-25
1,1,518.67,1.3,47.49,522.28,2388.07,8131.49,8.4318,0.03,392,...,1591.82,1403.14,14.62,21.61,553.75,2388.04,9044.07,0.0019,-0.0003,2021-09-26
2,1,518.67,1.3,47.27,522.42,2388.03,8133.23,8.4178,0.03,390,...,1587.99,1404.2,14.62,21.61,554.26,2388.08,9052.94,-0.0043,0.0003,2021-09-27
3,1,518.67,1.3,47.13,522.86,2388.08,8133.83,8.3682,0.03,392,...,1582.79,1401.87,14.62,21.61,554.45,2388.11,9049.48,0.0007,0.0,2021-09-28
4,1,518.67,1.3,47.28,522.19,2388.04,8133.8,8.4294,0.03,393,...,1582.85,1406.22,14.62,21.61,554.0,2388.06,9055.15,-0.0019,-0.0002,2021-09-29


In [4]:
def lag_columns(df:pd.DataFrame):
    data_test = df.copy()
    for col in data_test.columns[1:-1]:
        for i in range (1,4):
            data_test[f"{col}_{i}"] = data_test.groupby(['id'])[col].shift(i)
            if i == 1:
                prev_col = col
            else: 
                prev_col =f"{col}_{i-1}"
            data_test[f"{col}_{i-1}_to_{i}"] = data_test[f"{col}_{i}"] - data_test[prev_col]
        data_test.drop(columns=[f"{col}_{i}" if i!=0 else col for i in range(0,4)],inplace=True)
    return data_test

In [5]:
data_lag = lag_columns(data)
data_lag.head()

Unnamed: 0,id,date,p00_0_to_1,p00_1_to_2,p00_2_to_3,p01_0_to_1,p01_1_to_2,p01_2_to_3,p02_0_to_1,p02_1_to_2,...,p19_2_to_3,p20_0_to_1,p20_1_to_2,p20_2_to_3,s1_0_to_1,s1_1_to_2,s1_2_to_3,s2_0_to_1,s2_1_to_2,s2_2_to_3
0,1,2021-09-25,,,,,,,,,...,,,,,,,,,,
1,1,2021-09-26,0.0,,,0.0,,,-0.02,,...,,2.12,,,-0.0026,,,-0.0001,,
2,1,2021-09-27,0.0,0.0,,0.0,0.0,,0.22,-0.02,...,,-8.87,2.12,,0.0062,-0.0026,,-0.0006,-0.0001,
3,1,2021-09-28,0.0,0.0,0.0,0.0,0.0,0.0,0.14,0.22,...,0.02,3.46,-8.87,2.12,-0.005,0.0062,-0.0026,0.0003,-0.0006,-0.0001
4,1,2021-09-29,0.0,0.0,0.0,0.0,0.0,0.0,-0.15,0.14,...,-0.04,-5.67,3.46,-8.87,0.0026,-0.005,0.0062,0.0002,0.0003,-0.0006


# Spark

In [6]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("gbc_test")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

22/04/12 12:02:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
df_equipment = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://postgres/airflow")
    .option("dbtable", "public.equipment")
    .option("user", "airflow")
    .option("password", "airflow")
    .load()
)

In [8]:
df_equipment.createTempView("equipment")

In [9]:
df_equipment.show(10, True, True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

-RECORD 0-------------------
 id   | 1                   
 p00  | 518.67              
 p01  | 1.3                 
 p02  | 47.47               
 p03  | 521.66              
 p04  | 2388.02             
 p05  | 8138.62             
 p06  | 8.4195              
 p07  | 0.03                
 p08  | 392                 
 p09  | 2388                
 p10  | 100                 
 p11  | 641.82              
 p12  | 39.06               
 p13  | 23.419              
 p14  | 1589.7              
 p15  | 1400.6              
 p16  | 14.62               
 p17  | 21.61               
 p18  | 554.36              
 p19  | 2388.06             
 p20  | 9046.19             
 s1   | -7.0E-4             
 s2   | -4.0E-4             
 date | 2021-09-25 00:00:00 
-RECORD 1-------------------
 id   | 1                   
 p00  | 518.67              
 p01  | 1.3                 
 p02  | 47.49               
 p03  | 522.28              
 p04  | 2388.07             
 p05  | 8131.49             
 p06  | 8.4318

In [10]:
spark.sql("select * from equipment").show(1, True, True)

-RECORD 0-------------------
 id   | 1                   
 p00  | 518.67              
 p01  | 1.3                 
 p02  | 47.47               
 p03  | 521.66              
 p04  | 2388.02             
 p05  | 8138.62             
 p06  | 8.4195              
 p07  | 0.03                
 p08  | 392                 
 p09  | 2388                
 p10  | 100                 
 p11  | 641.82              
 p12  | 39.06               
 p13  | 23.419              
 p14  | 1589.7              
 p15  | 1400.6              
 p16  | 14.62               
 p17  | 21.61               
 p18  | 554.36              
 p19  | 2388.06             
 p20  | 9046.19             
 s1   | -7.0E-4             
 s2   | -4.0E-4             
 date | 2021-09-25 00:00:00 
only showing top 1 row



In [11]:
spark.sql("select id, date, p03 \
          ,LAG(p03,1) OVER (PARTITION BY id ORDER BY date) as p03_1 \
          ,LAG(p03,2) OVER (PARTITION BY id ORDER BY date) as p03_2 \
          from equipment  where id = 1"
         ).show(3, True, True)



-RECORD 0--------------------
 id    | 1                   
 date  | 2021-09-25 00:00:00 
 p03   | 521.66              
 p03_1 | null                
 p03_2 | null                
-RECORD 1--------------------
 id    | 1                   
 date  | 2021-09-26 00:00:00 
 p03   | 522.28              
 p03_1 | 521.66              
 p03_2 | null                
-RECORD 2--------------------
 id    | 1                   
 date  | 2021-09-27 00:00:00 
 p03   | 522.42              
 p03_1 | 522.28              
 p03_2 | 521.66              
only showing top 3 rows



                                                                                

In [12]:
import pyspark.sql.functions as F
from pyspark.sql import Window

In [13]:
id_column = "id"
order_column = "date"
windowSpec = (Window
              .partitionBy(id_column)
              .orderBy(order_column)
)

In [14]:
def spark_lag_columns(df):
    df_res = df
    for col in df_res.columns[1:-1]:
        for i in range (1,4):
            df_res = df_res.withColumn(f"{col}_{i}",F.lag(col,i).over(windowSpec))

            if i == 1:
                prev_col = col
            else: 
                prev_col =f"{col}_{i-1}"

            df_res = df_res.withColumn(f"{col}_{i-1}_to_{i}", F.col(f"{col}_{i}") - F.col(prev_col))

        df_res = df_res.drop(*[f"{col}_{i}" if i!=0 else col for i in range(0,4)])
    
    return df_res

In [15]:
df_equipment_lag = spark_lag_columns(df_equipment)

In [16]:
df_equipment_lag.where(F.col("id")==1).show(3,True,True)

22/04/12 12:03:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/04/12 12:03:03 WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
22/04/12 12:03:04 WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
22/04/12 12:03:09 WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB
22/04/12 12:03:28 WARN DAGScheduler: Broadcasting large task binary with size 1295.7 KiB

-RECORD 0--------------------------
 id         | 1                    
 date       | 2021-09-25 00:00:00  
 p00_0_to_1 | null                 
 p00_1_to_2 | null                 
 p00_2_to_3 | null                 
 p01_0_to_1 | null                 
 p01_1_to_2 | null                 
 p01_2_to_3 | null                 
 p02_0_to_1 | null                 
 p02_1_to_2 | null                 
 p02_2_to_3 | null                 
 p03_0_to_1 | null                 
 p03_1_to_2 | null                 
 p03_2_to_3 | null                 
 p04_0_to_1 | null                 
 p04_1_to_2 | null                 
 p04_2_to_3 | null                 
 p05_0_to_1 | null                 
 p05_1_to_2 | null                 
 p05_2_to_3 | null                 
 p06_0_to_1 | null                 
 p06_1_to_2 | null                 
 p06_2_to_3 | null                 
 p07_0_to_1 | null                 
 p07_1_to_2 | null                 
 p07_2_to_3 | null                 
 p08_0_to_1 | null          

                                                                                

# Скорим данные с Pandas_UDF

In [17]:
import pickle
model = pickle.load(open('/home/jovyan/work/models/linear_model.sav', 'rb'))

https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations


In [18]:
X = data_lag[(data_lag.id == 1) & (data_lag.date=='2021-09-28')]

In [19]:
def predict(df: pd.DataFrame):
    equipment_id = df['id']
    date = df['date']
    df.drop(columns = ['id','date'], inplace = True)

    predictions = model.predict(df)

    result = pd.DataFrame({'id': equipment_id,
                        'date': date,
                        'predict': predictions})
    return result

In [20]:
result = predict(X)
result

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return super().drop(


Unnamed: 0,id,date,predict
3,1,2021-09-28,122.737878


In [21]:
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T

In [23]:
df_spark_test = (df_equipment_lag
                 .where((F.col("id")==1) & (F.col("date")==F.lit('2021-09-28')))
                )

In [24]:
schema = T.StructType([T.StructField('id', T.LongType(), False),
                    T.StructField('date', T.DateType(), False),
                    T.StructField('predict', T.FloatType(), False)
                    ])

@pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def predict_repair(df: pd.DataFrame):
    equipment_id = df['id']
    date = df['date']
    features = df[df.columns[2:]]

    predictions = model.predict(features)

    result = pd.DataFrame({'id': equipment_id,
                        'date': date,
                        'predict': predictions})
    return result

In [25]:
result_spark = df_spark_test.groupby('id').apply(predict_repair)



In [26]:
result_spark.show(1, True, True)

22/04/12 12:08:44 WARN DAGScheduler: Broadcasting large task binary with size 1295.2 KiB
22/04/12 12:08:45 WARN DAGScheduler: Broadcasting large task binary with size 1295.2 KiB
22/04/12 12:08:48 WARN DAGScheduler: Broadcasting large task binary with size 1295.2 KiB
22/04/12 12:09:03 WARN DAGScheduler: Broadcasting large task binary with size 1295.2 KiB
22/04/12 12:10:32 WARN DAGScheduler: Broadcasting large task binary with size 1295.2 KiB

-RECORD 0-------------
 id      | 1          
 date    | 2021-09-28 
 predict | 122.73788  



                                                                                