In [1]:
import os
from pyspark.sql import SparkSession

# Definir o caminho do Spark corretamente
os.environ['SPARK_HOME'] = '/home/daiane/spark-3.5.1-bin-hadoop3/'

# Definir o caminho do Java corretamente
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.17.0-openjdk-amd64/'

# Iniciar uma sessão Spark
spark = SparkSession.builder \
    .appName("Exemplo Spark") \
    .getOrCreate()

# Testar a sessão Spark
spark


24/04/23 10:53:36 WARN Utils: Your hostname, victor-Lenovo-ideapad-330-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.74 instead (on interface wlp2s0)
24/04/23 10:53:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/23 10:53:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



Este projeto prático foca na transformação e integração de dados usando PySpark. <br>
Vou trabalhar com dois conjuntos de dados, realizar várias transformações como adicionar colunas, renomear colunas, eliminar colunas<br>
desnecessárias, unir dataframes e, por fim, escrever os resultados tanto em um armazém Hive quanto em um sistema de arquivos HDFS.


In [3]:
!pip install wget pyspark  findspark


Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25ldone
[?25h  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=0091535511e2ccaff1432695ddb07ab91ba643ec54d8452292aa6f4a76131f15
  Stored in directory: /home/daiane/.cache/pip/wheels/40/b3/0f/a40dbd1c6861731779f62cc4babcb234387e11d697df70ee97
Successfully built wget
Installing collected packages: wget, findspark
Successfully installed findspark-2.0.1 wget-3.2


In [18]:
import findspark

findspark.init()

In [47]:
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [48]:
#Spark context 

sc = SparkContext.getOrCreate()

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Task 1: Carregar datasets em DataFrames do PySpark. 
1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  

In [21]:
import wget
link_1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
link_2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_1)
wget.download(link_2)

100% [............................................................] 2688 / 2688

'dataset2 (1).csv'

In [49]:
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

### Task 2: Exibir o esquema de ambos os dataframes

In [50]:
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



### Task3: Adicionar uma nova coluna a cada dataframe, ano para df1 e trimestre para df2

In [51]:
from pyspark.sql.functions import year, quarter, to_date
df1 = df1.withColumn('year', year(to_date('date_column', 'dd/MM/yyyy')))
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))

### Task 4: Renomear a coluna "amount" para para "transaction" em df1 e a coluna "value" para "transaction_value" em df2

In [52]:
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
df2 = df2.withColumnRenamed('value', 'transaction_value')

### Task 5: Exclua as colunas desnecessárias "description" e "location" de df1 e "notes" de df2

In [53]:
df1 = df1.drop('description', 'location')
df2 = df2.drop('notes')

### Task 6: Junte df1 e df2 com base na coluna comum customer_id e crie um novo dataframe chamado joined_df.

In [55]:
joined_df = df1.join(df2, 'customer_id')

### Task 7: Filtrar joined_df para incluir apenas transações onde "transaction_amount" é maior que 1000 e criar um novo dataframe.

In [56]:
filtered_df = joined_df.filter("transaction_amount > 1000")

### Task 8: Agregação de dados por cliente, calcular o valor total das transações para cada cliente em filtered_df.

In [57]:
from pyspark.sql.functions import sum
total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

In [58]:
total_amount_per_customer.show()

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



### Task 9:Escrever "total_amount_per_customer" em uma tabela Hive chamada "customer_totals"

In [59]:
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

                                                                                

### Task 10: Escrever "filtered_df" no HDFS no formato parquet em um arquivo chamado filtered_data

In [61]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")


### Task 11: Adicionar uma nova coluna chamada high_value ao df1, indicando se o transaction_amount é maior que 5000

In [64]:
from pyspark.sql.functions import when, lit
df1 = df1.withColumn('high_value', when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))

### Task 12: cacular o valor médio da transação para cada trimestre no df2 e criar um novo dataframe chamado average_value_per_quarter com a coluna avg_trans_val

In [72]:
from pyspark.sql.functions import avg
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

In [73]:
average_value_per_quarter.show()

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



# Task 13: Salvar o resultado em uma tabela Hive

In [74]:
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

### Task 14: Calcular o valor de tansação por ano

In [75]:
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

In [76]:
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+



### Escrever o total por ano no HDFS em um CSV

In [77]:
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")