<a href="https://colab.research.google.com/github/alchemistcohen/Practica-PySpark/blob/main/Practica_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Practica PySpark

Este proyecto práctico se centra en la transformación e integración de datos mediante PySpark. Trabajamos con dos conjuntos de datos, realizamos diversas transformaciones como agregar, renombrar y eliminar columnas innecesarias, unir dataframes y, finalmente, escribimos los resultados tanto en un almacén de datos Hive como en un sistema de archivos HDFS.

# Instalar Librerias requeridas

In [1]:
!pip install wget pyspark  findspark

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=5788899f0adc6815ddc6515c4a5f051b2834e326e8d538a71a3912951450eb22
  Stored in directory: /root/.cache/pip/wheels/01/46/3b/e29ffbe4ebe614ff224bad40fc6a5773a67a163251585a13a9
Successfully built wget
Installing collected packages: wget, findspark
Successfully installed findspark-2.0.1 wget-3.2


In [7]:
!apt-get install openjdk-11-jdk -y

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  x11-utils
Suggested packages:
  default-jre pcscd libxt-doc openjdk-11-demo openjdk-11-source visualvm
  libnss-mdns fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre
  openjdk-11-jre-headless x11-utils
0 upgraded, 15 newly installed, 0 to remove and 41 not upgraded.
Need to get 122 MB of archives.


In [9]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

In [10]:
import findspark

findspark.init()

In [11]:
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

# SparkContext object y Spark Session

In [12]:
sc = SparkContext.getOrCreate()

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Cargar conjuntos de datos en DataFrames de PySpark

https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv


In [13]:
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

Cargar los datos en un Dataframe

In [14]:
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

## Esquema de df1 y df2 para comprender la estructura de los conjuntos de datos.

In [15]:
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



## Agregando una nueva columna a cada dataframe

Agregamos una nueva columna llamada **año** a `df1` y **trimestre** a `df2` que represente el año y el trimestre de los datos.

In [16]:
from pyspark.sql.functions import year, quarter, to_date

# Agregar nueva columna 'year' a df1
df1 = df1.withColumn('year', year(to_date('date_column', 'dd/MM/yyyy')))

# Agregar nueva columna 'quarter' a df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date', 'dd/MM/yyyy')))


# Cambiamos el nombre de la columna amount a transaction_amount en df1 y value a transaction_value en df2.

In [17]:
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')

#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')

# Eliminar columnas innecesarias
 Eliminar la descripción y ubicación de las columnas de df1 y las notas de df2.

In [18]:

df1 = df1.drop('description', 'location')


df2 = df2.drop('notes')

## Unir dataframes basándose en una columna común

Unir df1 y df2 basándose en la columna común customer_id y crear un nuevo dataframe llamado joined_df.

In [19]:
joined_df = df1.join(df2, 'customer_id', 'inner')

## Filtrar datos según una condición
Filtrar joined_df para incluir solo las transacciones donde "transaction_amount" sea mayor que 1000 y crear un nuevo dataframe llamado filtered_df.

In [20]:
filtered_df = joined_df.filter("transaction_amount > 1000")

## Agrega datos por cliente
 Calcula el importe total de las transacciones para cada cliente en filtered_df y muestra el resultado.

In [21]:
from pyspark.sql.functions import sum


total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))


total_amount_per_customer.show()

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



## Escribimos el resultado en una tabla de Hive
Escribimos total_amount_per_customer en una tabla de Hive llamada customer_totals.

In [22]:
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

## Escribimos los datos filtrados en HDFS

Escribimos filtered_df en HDFS en formato Parquet en un archivo llamado filtered_data.

In [24]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

## Agregamos una nueva columna según una condición

Agregar una nueva columna llamada high_value a df1 que indique si transaction_amount es mayor que 5000. Cuando el valor sea mayor que 5000, el valor de la columna debe ser Sí. Cuando el valor sea menor o igual a 5000, el valor de la columna debe ser No.

In [25]:
from pyspark.sql.functions import when, lit

df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))

## Calculamos el valor promedio de las transacciones por trimestre

 Calcular y mostrar el valor promedio de las transacciones para cada trimestre en df2 y crear un nuevo dataframe llamado average_value_per_quarter con la columna avg_trans_val.

In [26]:
from pyspark.sql.functions import avg

average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

average_value_per_quarter.show()

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



## Escribimos el resultado en una tabla de Hive

Escribimos average_value_per_quarter en una tabla de Hive llamada quarterly_averages.

In [27]:
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

## Calculamos el valor total de las transacciones por año

 Calcular y mostrar el valor total de las transacciones para cada año en df1 y crear un nuevo dataframe llamado total_value_per_year con la columna total_transaction_val.

In [28]:
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+



## Escribimos el resultado en HDFS

Escribimos total_value_per_year en HDFS en formato CSV en un archivo llamado total_value_per_year.

In [29]:
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")