Universidad Autónoma de Nuevo León
Facultad de Ciencias Fisico Matemáticas
Maestría en Ciencia de Datos
Alumno: Francisco Treviño
Fecha: 2024-10-10

Tarea 2 (10 puntos). Operaciones con RDDs
* Obtener información de algún origen de datos (propio o de API)
* Convertir el origen de datos a RDD con pySpark
* Realizar alguna operación en el RDD, como estadísticas descriptivas básicas
* Subir la práctica a un repositorio público y etiquetarla claramente

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,031 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Ign:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:13 https://ppa.launchpadcontent.net/ubuntugis/p

In [2]:
import os
import sys

import findspark
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [3]:
findspark.init()
findspark.find()
spark=SparkSession.builder.appName("Intraday").getOrCreate()

In [4]:
spark

In [5]:
# Importar librerías necesarias
from pyspark.sql import SparkSession
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# Inicializar sesión de Spark
spark = SparkSession.builder.appName("IntradayDataProcessing").getOrCreate()

In [7]:
# Paso 1: Lista de archivos CSV en el directorio de Google Drive
# Asegúrate de que esta ruta sea la correcta donde se encuentran los archivos en tu Google Drive
csv_directory = "/content/drive/My Drive/UANL/MCD/07 Big Data/"

# Nombres de los archivos CSV
csv_file_names = ["intraday_bmv_a.txt", "intraday_bmv_b.txt", "intraday_bmv_c.txt", "intraday_bmv_d.txt"]

# Lista para almacenar los DataFrames cargados
df_list = []

# Cargar los archivos CSV desde Google Drive, especificando que están delimitados por TAB
for csv_file_name in csv_file_names:
    # Leer el archivo CSV desde la ruta de Google Drive
    file_path = csv_directory + csv_file_name
    df = spark.read.csv(file_path, sep="\t", header=True, inferSchema=True)
    df_list.append(df)

In [8]:
# Unir todos los DataFrames en uno solo (si es necesario)
combined_df = df_list[0]
for df in df_list[1:]:
    combined_df = combined_df.union(df)

# Mostrar una muestra del DataFrame unido
combined_df.show(5)

+-------------------+------------+-------------+-------------------+------+-----+-------+--------+----------+---------+-----------+-----------------+----------+--------------+-----------------+------------+---+-----+-------+
|         trade_time|match_number|instrument_id|          timestamp|volume|price| amount|buyer_id|buyer_name|seller_id|seller_name|auction_indicator|settlement|operation_type|concertation_type|price_setter|lot|chart| symbol|
+-------------------+------------+-------------+-------------------+------+-----+-------+--------+----------+---------+-----------+-----------------+----------+--------------+-----------------+------------+---+-----+-------+
|2023-01-02 08:30:00|          23|            5|2023-01-02 08:30:00|   102|68.59|6996.18|      61|     SCTIA|       72|        JPM|                -|         -|             -|                -|           1|  1|    1|WALMEX*|
|2023-01-02 08:30:00|          28|            5|2023-01-02 08:30:00|   100|68.59| 6859.0|      12|  

In [9]:
combined_df.printSchema()

root
 |-- trade_time: timestamp (nullable = true)
 |-- match_number: integer (nullable = true)
 |-- instrument_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- volume: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- buyer_id: integer (nullable = true)
 |-- buyer_name: string (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- auction_indicator: string (nullable = true)
 |-- settlement: string (nullable = true)
 |-- operation_type: string (nullable = true)
 |-- concertation_type: string (nullable = true)
 |-- price_setter: integer (nullable = true)
 |-- lot: integer (nullable = true)
 |-- chart: integer (nullable = true)
 |-- symbol: string (nullable = true)



In [10]:
combined_df = combined_df.repartition(10)

In [11]:
# Convertir el DataFrame a un RDD
rdd = combined_df.rdd

In [12]:
rdd.take(5)  # Muestra los primeros 5 registros del RDD

[Row(trade_time=datetime.datetime(2023, 2, 23, 10, 20, 34), match_number=6109, instrument_id=5, timestamp=datetime.datetime(2023, 2, 23, 10, 20, 34), volume=100, price=71.3, amount=7130.0, buyer_id=134, buyer_name='FMX', seller_id=12, seller_name='GBM', auction_indicator='-', settlement='-', operation_type='-', concertation_type='-', price_setter=1, lot=1, chart=1, symbol='WALMEX*'),
 Row(trade_time=datetime.datetime(2023, 1, 6, 11, 22, 2), match_number=6745, instrument_id=5, timestamp=datetime.datetime(2023, 1, 6, 11, 22, 2), volume=750, price=71.8, amount=53850.0, buyer_id=120, buyer_name='CICB', seller_id=17, seller_name='INVEX', auction_indicator='-', settlement='-', operation_type='-', concertation_type='-', price_setter=1, lot=1, chart=1, symbol='WALMEX*'),
 Row(trade_time=datetime.datetime(2023, 3, 24, 13, 53, 15), match_number=17748, instrument_id=5, timestamp=datetime.datetime(2023, 3, 24, 13, 53, 15), volume=100, price=72.9, amount=7290.0, buyer_id=134, buyer_name='FMX', sell

In [13]:
combined_df.describe().show()

+-------+-----------------+-------------+-----------------+-----------------+-----------------+------------------+----------+-----------------+-----------+-----------------+------------------+--------------+-----------------+------------------+------------------+------------------+-------+
|summary|     match_number|instrument_id|           volume|            price|           amount|          buyer_id|buyer_name|        seller_id|seller_name|auction_indicator|        settlement|operation_type|concertation_type|      price_setter|               lot|             chart| symbol|
+-------+-----------------+-------------+-----------------+-----------------+-----------------+------------------+----------+-----------------+-----------+-----------------+------------------+--------------+-----------------+------------------+------------------+------------------+-------+
|  count|          8206976|      8206976|          8206976|          8206976|          8206976|           8206976|   8206555|  

In [15]:
combined_df.filter(combined_df.buyer_id.isin([149, 50])).orderBy(F.asc("trade_time")).limit(100).show()

+-------------------+------------+-------------+-------------------+------+-----+-------+--------+----------+---------+-----------+-----------------+----------+--------------+-----------------+------------+---+-----+-------+
|         trade_time|match_number|instrument_id|          timestamp|volume|price| amount|buyer_id|buyer_name|seller_id|seller_name|auction_indicator|settlement|operation_type|concertation_type|price_setter|lot|chart| symbol|
+-------------------+------------+-------------+-------------------+------+-----+-------+--------+----------+---------+-----------+-----------------+----------+--------------+-----------------+------------+---+-----+-------+
|2023-04-03 07:30:00|          41|            5|2023-04-03 07:30:00|     1| 72.2|   72.2|     149|       GBM|       14|        FMX|                -|         4|             C|                O|           0|  0|    0|WALMEX*|
|2023-04-03 07:30:00|          39|            5|2023-04-03 07:30:00|     4| 72.2|  288.8|     149|  

In [17]:
from pyspark.sql import functions as F

# Agrupar por 'buy_participant_id' y la fecha (día) extraída de 'trade_time', luego sumar 'executed_quantity'
combined_df.groupBy(F.to_date("trade_time").alias("trade_date"), "buyer_id") \
           .sum("volume") \
           .orderBy(F.desc("trade_date")) \
           .show(truncate=False)

+----------+--------+-----------+
|trade_date|buyer_id|sum(volume)|
+----------+--------+-----------+
|2024-09-19|141     |454422     |
|2024-09-19|123     |550419     |
|2024-09-19|137     |28771      |
|2024-09-19|38      |162459     |
|2024-09-19|118     |117752     |
|2024-09-19|112     |56625      |
|2024-09-19|28      |1200000    |
|2024-09-19|54      |97218      |
|2024-09-19|29      |4285       |
|2024-09-19|24      |61580      |
|2024-09-19|0       |4879922    |
|2024-09-19|136     |670776     |
|2024-09-19|138     |215176     |
|2024-09-19|113     |859455     |
|2024-09-19|14      |1994679    |
|2024-09-19|140     |100000     |
|2024-09-19|121     |9213       |
|2024-09-19|119     |73102      |
|2024-09-19|51      |1127249    |
|2024-09-18|29      |7088       |
+----------+--------+-----------+
only showing top 20 rows



In [18]:
# Realizar estadísticas descriptivas básicas
price_rdd = rdd.map(lambda row: row["price"])

# Filtrar valores nulos
price_rdd = price_rdd.filter(lambda x: x is not None)

In [19]:
# Calcular estadísticas descriptivas básicas
count = price_rdd.count()
mean = price_rdd.mean()
min_value = price_rdd.min()
max_value = price_rdd.max()
stddev = price_rdd.stdev()

# Mostrar los resultados
print(f"Count: {count}")
print(f"Mean: {mean}")
print(f"Min: {min_value}")
print(f"Max: {max_value}")
print(f"Standard Deviation: {stddev}")

Count: 8206976
Mean: 67.09937394856425
Min: 58.69
Max: 76.42
Standard Deviation: 3.837460223585266
