In [1]:
# Imports
from time import time

import pyspark.sql.dataframe
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder\
    .config('spark.driver.extraClassPath', '../jar/*')\
    .getOrCreate()

23/01/26 10:48:33 WARN Utils: Your hostname, Wylders-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.5 instead (on interface en0)
23/01/26 10:48:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/26 10:49:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def time_measure(funcion):
    def measure_function(*args, **kwargs):
        start = time()
        c = funcion(*args, **kwargs)
        print(time() - start)
        return c
    return measure_function

In [3]:
@time_measure
def read_sql_spark(url: str, query: str, spark_session: pyspark.sql.session.SparkSession, is_table:bool=False ) -> pyspark.sql.dataframe.DataFrame:

    df = spark_session.read\
        .format('com.microsoft.sqlserver.jdbc.spark')\
        .option('url', url)\
        .option('dbtable' if is_table else 'query', query).load()

    return df

In [27]:
url_db = 'jdbc:sqlserver://xmdevpronosticos.database.windows.net:1433;database=PronDDAdev;user=admindevpron;password=Pronosticos2019.*'
atypical_date_table_name = '[ETLStaging].[AtypicalDate]'
df_atypical_date = read_sql_spark(url_db, atypical_date_table_name, spark, is_table=True )

df_atypical_date.show()

0.8184540271759033


[Stage 14:>                                                         (0 + 1) / 1]

+------------+----------------+--------------------+-------+-------------------+
|AtypicalDate|AtypicalDateCode|    AtypicalDateName|TypeDay|     XMCreationDate|
+------------+----------------+--------------------+-------+-------------------+
|  2000-01-01|          1--ENE|    Primero de enero|   FEST|2020-02-04 18:31:22|
|  2000-01-02|          2--ENE|        Dos de enero|    ORD|2020-02-04 18:31:22|
|  2000-01-03|           LUNES|Lunes en vacacion...|    ORD|2020-02-04 18:31:22|
|  2000-01-04|          MARTES|Martes en vacacio...|    ORD|2020-02-04 18:31:22|
|  2000-01-05|       MIERCOLES|MiÃ©rcoles en vac...|    ORD|2020-02-04 18:31:22|
|  2000-01-06|          JUEVES|Jueves en vacacio...|    ORD|2020-02-04 18:31:22|
|  2000-01-07|         VIERNES|Viernes en vacaci...|    ORD|2020-02-04 18:31:22|
|  2000-01-08|          SABADO|SÃ¡bado antes de ...|    SAB|2020-02-04 18:31:22|
|  2000-01-09|         DOMINGO|Domingo antes de ...|   FEST|2020-02-04 18:31:22|
|  2000-01-10|              

                                                                                

In [5]:
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)

PySpark Version :3.2.1
PySpark Version :3.2.1


In [6]:
from datetime import timedelta, datetime
import pyspark.sql.functions as F

time_exec = '2023-01-24 00:00:00'

forecast_start = '2023-01-23 00:00:00'
forecast_start = datetime.strptime(forecast_start, "%Y-%m-%d %H:%M:%S")
forecast_start = forecast_start.replace(second=0, microsecond=0)
forecast_start = str(forecast_start - timedelta(minutes=5))

# Calendar
query = 'EXEC ETLStaging.uspGetAtypicalDate'


In [7]:
@time_measure
def dias(df: pyspark.sql.dataframe.DataFrame):

    df.createOrReplaceTempView('df_calendar')

    df = spark.sql(
        'SELECT *, '
        'DAY(AtypicalDate) as day, '
        'MONTH(AtypicalDate) as month,'
        'YEAR(AtypicalDate) as year '
        'FROM df_calendar'
    )

    return df

In [8]:
df_atypical_date = dias(df_atypical_date)

0.2461230754852295


In [9]:
df_atypical_date.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+----------------+--------------------+-------+-------------------+---+-----+----+
|AtypicalDate|AtypicalDateCode|    AtypicalDateName|TypeDay|     XMCreationDate|day|month|year|
+------------+----------------+--------------------+-------+-------------------+---+-----+----+
|  2000-01-01|          1--ENE|    Primero de enero|   FEST|2020-02-04 18:31:22|  1|    1|2000|
|  2000-01-02|          2--ENE|        Dos de enero|    ORD|2020-02-04 18:31:22|  2|    1|2000|
|  2000-01-03|           LUNES|Lunes en vacacion...|    ORD|2020-02-04 18:31:22|  3|    1|2000|
|  2000-01-04|          MARTES|Martes en vacacio...|    ORD|2020-02-04 18:31:22|  4|    1|2000|
|  2000-01-05|       MIERCOLES|MiÃ©rcoles en vac...|    ORD|2020-02-04 18:31:22|  5|    1|2000|
|  2000-01-06|          JUEVES|Jueves en vacacio...|    ORD|2020-02-04 18:31:22|  6|    1|2000|
|  2000-01-07|         VIERNES|Viernes en vacaci...|    ORD|2020-02-04 18:31:22|  7|    1|2000|
|  2000-01-08|          SABADO|SÃ¡bado a

                                                                                

In [10]:
@time_measure
def month(df):
    df = df.withColumn('day2', F.dayofmonth('AtypicalDate'))\
        .withColumn('month2', F.month('AtypicalDate'))\
        .withColumn('year2', F.year('AtypicalDate'))

    return df

In [11]:
df = month(df_atypical_date)

0.09905505180358887


In [12]:
df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+------------+----------------+--------------------+-------+-------------------+---+-----+----+----+------+-----+
|AtypicalDate|AtypicalDateCode|    AtypicalDateName|TypeDay|     XMCreationDate|day|month|year|day2|month2|year2|
+------------+----------------+--------------------+-------+-------------------+---+-----+----+----+------+-----+
|  2000-01-01|          1--ENE|    Primero de enero|   FEST|2020-02-04 18:31:22|  1|    1|2000|   1|     1| 2000|
|  2000-01-02|          2--ENE|        Dos de enero|    ORD|2020-02-04 18:31:22|  2|    1|2000|   2|     1| 2000|
|  2000-01-03|           LUNES|Lunes en vacacion...|    ORD|2020-02-04 18:31:22|  3|    1|2000|   3|     1| 2000|
|  2000-01-04|          MARTES|Martes en vacacio...|    ORD|2020-02-04 18:31:22|  4|    1|2000|   4|     1| 2000|
|  2000-01-05|       MIERCOLES|MiÃ©rcoles en vac...|    ORD|2020-02-04 18:31:22|  5|    1|2000|   5|     1| 2000|
|  2000-01-06|          JUEVES|Jueves en vacacio...|    ORD|2020-02-04 18:31:22|  6|    

                                                                                

In [13]:
@time_measure
def rename(df):
    df = df.withColumn('AtypicalDateCode', F.lower('AtypicalDateCode'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '1--ene', 'ene-1'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '2--ene', 'ene_2'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '1--may', 'may_1'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '20--jul', 'jul_20'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '7--ago', 'ago_7'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '8--dic', 'dic_8'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '24--dic', 'dic_24'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '25--dic', 'dic_25'))\
        .withColumn('AtypicalDateCode', F.regexp_replace('AtypicalDateCode', '31--dic', 'dic_31'))

    return df

In [14]:
df = rename(df)

0.5224709510803223


In [15]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+------------+----------------+--------------------+-------+-------------------+---+-----+----+----+------+-----+
|AtypicalDate|AtypicalDateCode|    AtypicalDateName|TypeDay|     XMCreationDate|day|month|year|day2|month2|year2|
+------------+----------------+--------------------+-------+-------------------+---+-----+----+----+------+-----+
|  2000-01-01|           ene-1|    Primero de enero|   FEST|2020-02-04 18:31:22|  1|    1|2000|   1|     1| 2000|
|  2000-01-02|           ene_2|        Dos de enero|    ORD|2020-02-04 18:31:22|  2|    1|2000|   2|     1| 2000|
|  2000-01-03|           lunes|Lunes en vacacion...|    ORD|2020-02-04 18:31:22|  3|    1|2000|   3|     1| 2000|
|  2000-01-04|          martes|Martes en vacacio...|    ORD|2020-02-04 18:31:22|  4|    1|2000|   4|     1| 2000|
|  2000-01-05|       miercoles|MiÃ©rcoles en vac...|    ORD|2020-02-04 18:31:22|  5|    1|2000|   5|     1| 2000|
|  2000-01-06|          jueves|Jueves en vacacio...|    ORD|2020-02-04 18:31:22|  6|    

                                                                                

In [16]:
import numpy as np

In [17]:
list_date_code = np.array(df.select('AtypicalDateCode').distinct().collect())

                                                                                

In [18]:
list_date_code

array([['vss'],
       ['jueves'],
       ['may_1'],
       ['viernes'],
       ['dss'],
       ['dic_25'],
       ['lunes'],
       ['martes'],
       ['dic_31'],
       ['jul_20'],
       ['sss'],
       ['sabado'],
       ['ene_2'],
       ['domingo'],
       ['ene-1'],
       ['miercoles'],
       ['jss'],
       ['lf'],
       ['ago_7'],
       ['mss'],
       ['dic_8'],
       ['dic_24']], dtype='<U9')

In [19]:
list_date_code_2 = [
    'ene_1', 'ene_2', 'may_1', 'jul_20', 'ago_7', 'dic_8', 'dic_24', 'dic_25',
    'dic_31', 'mss', 'jss', 'vss', 'sss', 'dss', 'lf'
]

list_type_day = np.array(df.select('TypeDay').distinct().collect())
list_type_day

                                                                                

array([['SAB'],
       ['FEST'],
       ['ORD']], dtype='<U4')

In [20]:
print(datetime.now())

2023-01-26 10:49:27.942937


In [36]:
variable_type = 'P'
measuring_element_id = 32
date_now = datetime.now().date()

query_input_measure = f'''
    SELECT * FROM [Forecast].[InputMeasure]
    WHERE [MeasuringElementMRID] = {measuring_element_id}
    AND [MeasuresTypeMRID] = '{variable_type}'
    AND [MeasuringDate] BETWEEN DATEADD(DAY,-123,'{date_now}') AND '{date_now}'
'''

input_measure = read_sql_spark(url_db, query_input_measure, spark)

0.8107640743255615


In [22]:
input_measure.show(10000)

                                                                                

+----------------+-------------------+--------------------+----------------+---------------+--------------------+----------+--------------+--------------------+
|InputMeasureMRID|      MeasuringDate|MeasuringElementMRID|MeasuresTypeMRID|ProcessTypeMRID|OriginTimeSeriesMRID|Resolution|MeasuringValue|      XMCreationDate|
+----------------+-------------------+--------------------+----------------+---------------+--------------------+----------+--------------+--------------------+
|      2010924746|2022-09-25 00:00:00|                  32|               P|             20|                  95|         5|       16.7423|2022-09-25 09:06:...|
|      2010930361|2022-09-25 00:05:00|                  32|               P|             20|                  95|         5|       16.7423|2022-09-25 09:06:...|
|      2010923929|2022-09-25 00:10:00|                  32|               P|             20|                  95|         5|       16.7423|2022-09-25 09:06:...|
|      2010934617|2022-09-25 00:15

In [43]:
@time_measure
def consulta_tabla_validaciones():
    dbtable = '[Forecast].[InputMeasure]'
    df_input_measure = read_sql_spark(url_db, dbtable, spark, is_table=True)
    df_input_measure = df_input_measure.filter((df_input_measure.MeasuringElementMRID == measuring_element_id) & (df_input_measure.MeasuresTypeMRID == variable_type))
    return df_input_measure


spark_validation = consulta_tabla_validaciones()

0.7815642356872559
0.7949070930480957


In [24]:
spark_validation.show()

[Stage 11:>                                                         (0 + 1) / 1]

+----------------+-------------------+--------------------+----------------+---------------+--------------------+----------+--------------+--------------------+
|InputMeasureMRID|      MeasuringDate|MeasuringElementMRID|MeasuresTypeMRID|ProcessTypeMRID|OriginTimeSeriesMRID|Resolution|MeasuringValue|      XMCreationDate|
+----------------+-------------------+--------------------+----------------+---------------+--------------------+----------+--------------+--------------------+
|      1574103493|2021-12-01 00:00:00|                  32|               P|             20|                  95|         5|       20.5242|2022-03-06 17:55:...|
|      1574033929|2021-12-01 00:05:00|                  32|               P|             20|                  95|         5|       20.3771|2022-03-06 17:54:...|
|      1574243003|2021-12-01 00:10:00|                  32|               P|             20|                  95|         5|       20.2366|2022-03-06 17:56:...|
|      1573840090|2021-12-01 00:15

                                                                                