# <br>**Spark-Hadoop_Local-Docker-Cluster**<br>

### <br>**1 - Instalação HDFS**<br>

In [1]:
!pip install hdfs --quiet

### <br>**2 - Imports necessários**<br>

In [2]:
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count

### <br>**3 - Conexão | Criação de diretório | Carregamento de dados no cluster Hadoop**<br>

In [3]:
hdfs_client = InsecureClient("http://172.25.0.2:9870", user="hadoop")

In [4]:
hdfs_client.makedirs("/user/dfs_data", permission=777)

In [5]:
hdfs_client.upload(hdfs_path="/user/dfs_data", local_path="dataset.parquet", overwrite=True)

'/user/dfs_data/dataset.parquet'

### <br>**4 - Inicia sessão do cluster Spark**<br>

In [6]:
spark = SparkSession.\
        builder.\
        appName("pyspark-parquet-experiment").\
        master("spark://172.25.0.8:7077").\
        config("spark.executor.memory", "1g").\
        getOrCreate()
spark

### <br>**5 - Carrega dados do Hadoop (HDFS) para o cluster Spark**<br>

In [7]:
df = spark.read.parquet("hdfs://namenode:8020/user/dfs_data/dataset.parquet")

### <br>**6 - Processamento dos dados na memória (cluster Spark)**<br>

In [8]:
df.show(10)

+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|          Timestamp|From Bank|  Account|To Bank|Account.1|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022-08-01 00:15:00|       20|800104D70|     20|800104D70|        8095.07|         US Dollar|    8095.07|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:18:00|     3196|800107150|   3196|800107150|        7739.29|         US Dollar|    7739.29|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:23:00|     1208|80010E430|   1208|80010E430|        2654.22|         US Dollar|    2654.22|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:19:00|     3203|80010EA80|   3203|80010EA80|       13284.41|       

In [9]:
df.printSchema()

root
 |-- Timestamp: timestamp_ntz (nullable = true)
 |-- From Bank: integer (nullable = true)
 |-- Account: string (nullable = true)
 |-- To Bank: integer (nullable = true)
 |-- Account.1: string (nullable = true)
 |-- Amount Received: float (nullable = true)
 |-- Receiving Currency: string (nullable = true)
 |-- Amount Paid: float (nullable = true)
 |-- Payment Currency: string (nullable = true)
 |-- Payment Format: string (nullable = true)
 |-- Is Laundering: integer (nullable = true)



In [10]:
print(f"Numero de linhas: {df.count()}")
print(f"Numero de colunas: {len(df.columns)}")

Numero de linhas: 10000000
Numero de colunas: 11


In [11]:
df = df.withColumnRenamed("From Bank", "From_Bank")\
.withColumnRenamed("To Bank", "To_Bank")\
.withColumnRenamed("Account.1", "Account_1")\
.withColumnRenamed("Amount Received", "Amount_Received")\
.withColumnRenamed("Receiving Currency", "Receiving_Currency")\
.withColumnRenamed("Amount Paid", "Amount_Paid")\
.withColumnRenamed("Payment Currency", "Payment_Currency")\
.withColumnRenamed("Payment Format", "Payment_Format")\
.withColumnRenamed("Is Laundering", "Is_Laundering")

In [12]:
df.show(5)

+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|          Timestamp|From_Bank|  Account|To_Bank|Account_1|Amount_Received|Receiving_Currency|Amount_Paid|Payment_Currency|Payment_Format|Is_Laundering|
+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022-08-01 00:15:00|       20|800104D70|     20|800104D70|        8095.07|         US Dollar|    8095.07|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:18:00|     3196|800107150|   3196|800107150|        7739.29|         US Dollar|    7739.29|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:23:00|     1208|80010E430|   1208|80010E430|        2654.22|         US Dollar|    2654.22|       US Dollar|  Reinvestment|            0|
|2022-08-01 00:19:00|     3203|80010EA80|   3203|80010EA80|       13284.41|       

### <br>**7 - Processamento RDD - Resilient Distributed Dataset**<br>

In [13]:
rdd_test = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

In [14]:
rdd_test.count()

8

In [15]:
rdd = df.rdd  # paralelizar dados com o RDD (Resilient Distributed Dataset)

In [16]:
rdd

MapPartitionsRDD[24] at javaToPython at NativeMethodAccessorImpl.java:0

### <br>**8 - Escrita dos dados em partes para o Hadoop (HDFS)**<br>

In [17]:
df = df.repartition(5)
df.write.parquet("hdfs://namenode:8020/user/dfs_data/dataset_2.parquet")

### <br>**9 - Carregamento dos dados escritos para o cluster Spark**<br>

In [18]:
df_2 = spark.read.parquet("hdfs://namenode:8020/user/dfs_data/dataset_2.parquet", header=True)

In [19]:
df_2.show(10)

+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|          Timestamp|From_Bank|  Account|To_Bank|Account_1|Amount_Received|Receiving_Currency|Amount_Paid|Payment_Currency|Payment_Format|Is_Laundering|
+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022-08-01 03:22:00|     3196|802AA8590|   3613|807F26840|         173.37|              Euro|     173.37|            Euro|          Wire|            0|
|2022-08-01 03:58:00|    20462|813F5A020|  20462|813F5A020|       13517.51|         US Dollar|   13517.51|       US Dollar|  Reinvestment|            0|
|2022-08-01 23:28:00|   155202|839721E60| 155202|839721E60|        2895.31|       Brazil Real|    2895.31|     Brazil Real|  Reinvestment|            0|
|2022-08-02 08:31:00|   219871|817CC2FF0| 184776|835106FC0|        2034.14|       

### <br>**10 - Deletar dados do Hadoop (HDFS)**<br>

In [20]:
hdfs_client.delete("/user/dfs_data/dataset_2.parquet", recursive=True)

True

### <br>**11 - Parar sessão do Spark**<br>

In [21]:
spark.stop()