<a href="https://colab.research.google.com/github/arthursouzadba/Notebook_Python_GPTW/blob/master/EngenheiroDeDados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## FAKER

In [2]:
!pip install pyspark faker datetime

Collecting faker
  Downloading faker-37.1.0-py3-none-any.whl.metadata (15 kB)
Collecting datetime
  Downloading DateTime-5.5-py3-none-any.whl.metadata (33 kB)
Collecting zope.interface (from datetime)
  Downloading zope.interface-7.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Downloading faker-37.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading DateTime-5.5-py3-none-any.whl (52 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.6/52.6 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading zope.interface-7.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (259 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
from faker import Faker
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from datetime import datetime
# Import the PySpark functions module
from pyspark.sql import functions as F

In [4]:
fake = Faker()

spark = SparkSession.builder.appName('data_generator').getOrCreate()

def generate_data(n):
    data = []
    for _ in range(n):
        name = fake.name()
        balance = random.uniform(1000, 10000)  # Generate balance without rounding
        account_number = fake.bban()
        date = fake.date_between(start_date='-1y', end_date='today')
        data.append((name, balance, account_number, date))
    return data

# Define explicit schema with proper types
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("balance", DoubleType(), nullable=False),
    StructField("account_number", StringType(), nullable=False),
    StructField("date", DateType(), nullable=False)
])

# Generate data
dados = generate_data(n=10000)

# Create DataFrame with explicit schema
df = spark.createDataFrame(data=dados, schema=schema)
# Round the 'balance' column using PySpark's round function
df = df.withColumn("balance", F.round(df["balance"], 2))

# Show results
df.show(5)
print(f"Total records: {df.count()}")

+-------------+-------+------------------+----------+
|         name|balance|    account_number|      date|
+-------------+-------+------------------+----------+
| Kelly Thomas|5931.54|PRCM99746494871625|2024-12-30|
|Rose Martinez| 2146.6|LHKO83983170102934|2024-09-08|
|   Leah Smith|6336.59|SCKA53642009328399|2024-07-10|
|Charles Lucas|5144.46|WJOQ70916287667092|2024-12-22|
|Sherry Morgan|5232.37|AKXG72798479364665|2025-03-28|
+-------------+-------+------------------+----------+
only showing top 5 rows

Total records: 10000


In [5]:
df.count()

10000

In [9]:
df_ordenado = df.orderBy(df.balance.desc()).limit(10)
df_ordenado.show()

+----------------+-------+------------------+----------+
|            name|balance|    account_number|      date|
+----------------+-------+------------------+----------+
|   John Robinson| 9999.0|GYLE24324219877366|2025-03-28|
|   Daniel Fowler|9998.79|FRVN38513037137273|2025-03-11|
|  Alexander Chen|9998.29|RYWV73856690533196|2024-10-03|
|       Stacy Ray|9993.55|FQLS58224636577227|2024-10-25|
|       Troy Shaw|9993.36|HQIV35307719888054|2025-01-10|
|     Victor Wood|9993.13|WDHA20295124310499|2024-06-02|
|     Aaron Burns|9992.39|XYME88808566387375|2025-03-19|
|Michael Crawford|9990.64|HOFL31362905668136|2025-01-22|
|   Jennifer Choi|9990.09|YOAN63237192127549|2024-10-01|
|   Austin Willis|9988.23|CVIQ81493998646151|2024-08-10|
+----------------+-------+------------------+----------+



## ETL e ELT (SPARK CLUSTER)

In [10]:
spark = SparkSession.builder.appName('sojaSensores').getOrCreate()

In [19]:
spark.read.option("header", "true").csv("sensores-iot.csv").show()

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
|  5|sensor-2821|         15|      52|2000-10-27 23:32:...|-55.3025525| -26.657210|
|  6|sensor-9805|         12|      37|2022-08-09 22:08:...| -25.912786| 122.693490|
|  7|sensor-1230|         10|      60|1970-02-02 17:32:...|-88.9300035|  33.377804|
|  8|sensor-4472|         11|      75|2014-02-22 06:42:...|-89.0426855| 120.