In [None]:
# En caso de trabajar en otra plataforma que no sea Databricks
# instalar la librería PySpark
# pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
# Iniciar Spark en otra plataforma que no sea Databricks
# spark = SparkSession.builder.appName('DataFrame').getOrCreate()

Conexión DataLake y lectura de archivos

In [None]:
container = '' # Nombre del contenedor en Azure Data Lake
datalake = '' # Nombre de la instancia de Azure Data Lake (o Storage Account)

In [None]:
access_key = '*******' # Clave de acceso a Azure Data Lake
spark.conf.set(f'fs.azure.account.key.{datalake}.dfs.core.windows.net', access_key)

In [None]:
file_names = ['Company_Tweet', 'Company', 'CompanyValues', 'Tweet']

In [None]:
for name in file_names:
    exec(f"PATH_{name} = f'abfss://{container}@{datalake}.dfs.core.windows.net/RawData/2023-05-23/{name}.csv'")
    exec(f"df_{name} = spark.read.load(PATH_{name}, format = 'csv', header = 'true', sep='|' ,inferSchema = True)")

Casteos y limpieza de nulos

In [None]:
# Creacion de columnas auxiliares de validacion de tipo de datos
def validacion_numero(tabla, columna):
  tabla = tabla.withColumn(
      f'{columna}_type',
       (col(columna).cast('int').isNotNull()) | (col(columna).isNull())
       )
  return tabla

In [None]:
column_int = ['comment_num', 'retweet_num', 'like_num']

for column in column_int:
  df_Tweet = validacion_numero(df_Tweet, column)

In [None]:
df_Tweet.count()

3717964

In [None]:
# Cantidad de datos que no son string
df_Tweet.where((col('comment_num_type') == False) |
               (col('retweet_num_type') == False) |
               (col('like_num_type') == False)).count()

42943

In [None]:
# Guardados de datos que son numericos unicamente
df_Tweet.where((col('comment_num_type') == True) &
               (col('retweet_num_type') == True) &
               (col('like_num_type') == True)).count()

3675021

In [None]:
df_Tweet = df_Tweet.where(
    (col('comment_num_type') == True) &
    (col('retweet_num_type') == True) &
    (col('like_num_type') == True))

In [None]:
# Cantidad de datos nulos dentro de los campos numericos
df_Tweet.where(
    (isnull(col('comment_num')) |
    (isnull(col('retweet_num'))) |
    (isnull(col('like_num'))))
    ).count()

14

In [None]:
# Filtrado de datos no nulos en columnas numericas
df_Tweet = df_Tweet.where(~(
              (isnull(df_Tweet['comment_num'])) |
              (isnull(df_Tweet['retweet_num'])) |
              (isnull(df_Tweet['like_num']))
              ))

In [None]:
# Guardado de columnas principales
df_Tweet = df_Tweet.select('tweet_id',
                           'writer',
                           'post_date',
                           'body',
                           'comment_num',
                           'retweet_num',
                           'like_num')

In [None]:
# Casteo de columnas numericas
def cast_int(tabla, columna):
  tabla = tabla.withColumn(columna, col(columna).cast('int'))
  return tabla

In [None]:
for column in column_int:
  df_Tweet = cast_int(df_Tweet, column)

In [None]:
# Casteo de fecha y hora en horario UTC
df_Tweet = df_Tweet.withColumn('post_date', (from_unixtime(col('post_date'))).
                               cast('timestamp'))

In [None]:
# Columna que indica si el mercado se encontraba abierto o cerrado
df_Tweet = df_Tweet.withColumn('open_market',
                               when((hour('post_date') >= 14) &
                                    (hour('post_date') <= 21), True).
                               otherwise(False))


Calculos variaciones y marginalidad

In [None]:
# Ventana que trae datos de columnas de un registro anterior
windowSpec = Window.orderBy("ticker_symbol", 'ticker_symbol')

In [None]:
# Esto es viejo ya no iria
df_CompanyValues = df_CompanyValues.withColumn('previous_open_value', lag(col('open_value')).over(windowSpec))\
                  .withColumn('previous_ticker_symbol', lag(col('ticker_symbol')).over(windowSpec))\
                  .withColumn('previus_volume', lag(col('volume')).over(windowSpec))

In [None]:
def previous_value(tabla, columna):
  tabla = tabla.withColumn(f'previous_{columna}', lag(col(columna)).over(windowSpec))
  return tabla

In [None]:
previous_column = ['open_value', 'ticker_symbol', 'volume']

for column in previous_column:
  df_CompanyValues = previous_value(df_CompanyValues, column)

In [None]:
# Variacion precio durante cierre
df_CompanyValues = df_CompanyValues.withColumn('v_open_diaria',
                   when(col('ticker_symbol') == col('previous_ticker_symbol'),
                        round((col('open_value') - col('previous_open_value')),2)).otherwise(None))

# Variacion precio durante apertura
df_CompanyValues = df_CompanyValues.withColumn('v_open',
                   when(col('ticker_symbol') == col('previous_ticker_symbol'),
                        round((col('close_value') - col('open_value')),2)).otherwise(None))

# Variacion de volumen
df_CompanyValues = df_CompanyValues.withColumn('v_volume',
                   when(col('ticker_symbol') == col('previous_ticker_symbol'),
                        round((col('volume') - col('previous_volume')),2)).otherwise(None))

# Variacion marginal del volumen respecto al precio
df_CompanyValues = df_CompanyValues.withColumn('mg_open_volume',
                   when(col('ticker_symbol') == col('previous_ticker_symbol'),
                        (col('v_open') / col('v_volume'))).otherwise(None))

In [None]:
for column in previous_column:
  df_CompanyValues = df_CompanyValues.drop(f'{column}')

In [None]:
# Escribir cada dataframe en rutas diferentes del data lake en formato Parqet
for tabla in file_name:
    exec(f"df_{tabla}.write.save(path=f'abfss://{container}@{datalake}.dfs.core.windows.net/Trusted/{tabla}', format='parquet', mode='overwrite')")