<a href="https://colab.research.google.com/github/gabimalaspina/Case_Bradesco/blob/main/case_bradesco.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Case Bradesco - Processo seletivo Gabriela Malaspina**

#### Etapa 1

"Carregue as bases de dados como DataFrames do PySpark. Especifique os formatos dos dados manualmente, não use a opção inferSchema = True."


##### **1. Configuração do ambiente**

In [None]:
# Importação das bibliotecas e recursos

# Bibliotecas e recursos do PySpark
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, TimestampType
from pyspark.sql.functions import to_timestamp

# Biblioteca para gerenciamento dos diretórios
import os
from datetime import datetime

# Biblioteca para criação da database
import sqlite3

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=57efdfcc2f6839a604d8f80a1b34f2292e12335cd7918fa9bb25b6a227be7592
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# URL para download do driver JDBC SQLite
url = 'https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.36.0.3/sqlite-jdbc-3.36.0.3.jar'

# Fazer o download do arquivo JAR
!wget "$url"

--2024-03-21 13:13:12--  https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.36.0.3/sqlite-jdbc-3.36.0.3.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9731064 (9.3M) [application/java-archive]
Saving to: ‘sqlite-jdbc-3.36.0.3.jar’


2024-03-21 13:13:13 (96.2 MB/s) - ‘sqlite-jdbc-3.36.0.3.jar’ saved [9731064/9731064]



In [None]:
# Criação da SparkSession
spark = (
    SparkSession.builder
    .appName("dados_har")
    .config("spark.driver.extraClassPath", "/content/sqlite-jdbc-3.36.0.3.jar")
    .getOrCreate()
)

##### **2. Configuração de diretórios, download e extração**

In [None]:
# Criação de subpastas dentro da pasta "content" para simular as camadas "raw" e "bronze"

raw = '/content/raw'
os.mkdir(raw)

bronze = '/content/bronze'
os.mkdir(bronze)

In [None]:
# Download e extração do arquivo .zip na camada "raw" (dados brutos)

# Link para download
url='https://archive.ics.uci.edu/ml/machine-learning-databases/00344/Activity%20recognition%20exp.zip'

# Mudança para a pasta "raw"
os.chdir(raw)

# Download do .zip com o nome "activity_rec_exp.zip" na camada "raw"
!wget -O activity_recognition_exp.zip "$url"

# Extração do conteúdo do .zip na camada "raw"
!unzip -j -d activity_recognition_exp /content/raw/activity_recognition_exp.zip

--2024-03-21 13:13:21--  https://archive.ics.uci.edu/ml/machine-learning-databases/00344/Activity%20recognition%20exp.zip
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘activity_recognition_exp.zip’

activity_recognitio     [   <=>              ] 741.13M  42.7MB/s    in 63s     

2024-03-21 13:14:25 (11.7 MB/s) - ‘activity_recognition_exp.zip’ saved [777127275]

Archive:  /content/raw/activity_recognition_exp.zip
  inflating: activity_recognition_exp/.DS_Store  
  inflating: activity_recognition_exp/._.DS_Store  
  inflating: activity_recognition_exp/Phones_accelerometer.csv  
  inflating: activity_recognition_exp/Phones_gyroscope.csv  
  inflating: activity_recognition_exp/readme.txt  
  inflating: activity_recognition_exp/._readme.txt  
  inflating: activity_recognition_exp/Watch_accelerometer.c

##### **3. Leitura dos arquivos .csv e configuração dos dataframes**

Os 4 arquivos possuem a mesma estrutura de atributos:
- Index > identificador da amostra (id): INT
- Arrival_Time e Creation_Time > momento de chegada e criação das amostras: DATETIME / TIMESTAMP
- x, y e z > representam leituras de acelerômetro/giroscópio: FLOAT
- User > usuário: STRING
- Model e Device > modelo e nome do aparelho: STRING
- gt > "groundtruth" - referência de posição: STRING

Dessa forma, pode-se utilizar a mesma tipagem para o schema em todos os datasets, assim como a formatação para as colunas de datas.


In [None]:
# Definição do Schema a ser utilizado em todos os datasets

schema = StructType([
    StructField('Index', IntegerType(), True),
    StructField('Arrival_Time', StringType(), True), # Será feita transformação para formato de data na sequência
    StructField('Creation_Time', StringType(), True), # Será feita transformação para formato de data na sequência
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True),
    StructField('User', StringType(), True),
    StructField('Model', StringType(), True),
    StructField('Device', StringType(), True),
    StructField('gt', StringType(), True)
])

###### Arquivo 01- Acelerômetro de celulares

In [None]:
caminho_01 = '/content/raw/activity_recognition_exp/Phones_accelerometer.csv'

# Dataframe inicial
phones_acc = spark.read.csv(caminho_01, header=True).show(5)

+-----+-------------+-------------------+------------------+------------------+--------+----+------+--------+-----+
|Index| Arrival_Time|      Creation_Time|                 x|                 y|       z|User| Model|  Device|   gt|
+-----+-------------+-------------------+------------------+------------------+--------+----+------+--------+-----+
|    0|1424696633908|1424696631913248572|         -5.958191|         0.6880646|8.135345|   a|nexus4|nexus4_1|stand|
|    1|1424696633909|1424696631918283972|          -5.95224|         0.6702118|8.136536|   a|nexus4|nexus4_1|stand|
|    2|1424696633918|1424696631923288855|        -5.9950867|0.6535491999999999|8.204376|   a|nexus4|nexus4_1|stand|
|    3|1424696633919|1424696631928385290|        -5.9427185|0.6761626999999999|8.128204|   a|nexus4|nexus4_1|stand|
|    4|1424696633929|1424696631933420691|-5.991516000000001|        0.64164734|8.135345|   a|nexus4|nexus4_1|stand|
+-----+-------------+-------------------+------------------+------------

In [None]:
# Aplicação do schema e transformações das datas

phones_acc = spark.read.csv(caminho_01, schema=schema, header=True)

# Conversão de Arrival_Time (em milissegundos)
phones_acc = phones_acc.withColumn("Arrival_Time", (phones_acc["Arrival_Time"] / 1000).cast("timestamp"))

# Conversão de Creation_Time (em nanossegundos)
phones_acc = phones_acc.withColumn("Creation_Time", (phones_acc["Creation_Time"] / 1e9).cast("timestamp"))

In [None]:
# Dataframe formatado
phones_acc.show(5)

+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|Index|        Arrival_Time|       Creation_Time|         x|         y|       z|User| Model|  Device|   gt|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|    0|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.958191| 0.6880646|8.135345|   a|nexus4|nexus4_1|stand|
|    1|2015-02-23 13:03:...|2015-02-23 13:03:...|  -5.95224| 0.6702118|8.136536|   a|nexus4|nexus4_1|stand|
|    2|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9950867| 0.6535492|8.204376|   a|nexus4|nexus4_1|stand|
|    3|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9427185| 0.6761627|8.128204|   a|nexus4|nexus4_1|stand|
|    4|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.991516|0.64164734|8.135345|   a|nexus4|nexus4_1|stand|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
only showing top 5 rows



In [None]:
# Verificação do Schema estruturado
phones_acc.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Arrival_Time: timestamp (nullable = true)
 |-- Creation_Time: timestamp (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)
 |-- User: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- gt: string (nullable = true)



###### Arquivo 02- Giroscópio de celulares

In [None]:
caminho_02 = '/content/raw/activity_recognition_exp/Phones_gyroscope.csv'

# Dataframe inicial
phones_gyr = spark.read.csv(caminho_02, header=True).show(5)

+-----+-------------+-------------------+--------------------+--------------------+------------+----+------+--------+-----+
|Index| Arrival_Time|      Creation_Time|                   x|                   y|           z|User| Model|  Device|   gt|
+-----+-------------+-------------------+--------------------+--------------------+------------+----+------+--------+-----+
|    0|1424696633909|1424696631914042029|         0.013748169|-0.00062561035000...|-0.023376465|   a|nexus4|nexus4_1|stand|
|    1|1424696633909|1424696631919046912|0.014816283999999999|       -0.0016937256| -0.02230835|   a|nexus4|nexus4_1|stand|
|    2|1424696633918|1424696631924051794|           0.0158844|       -0.0016937256|-0.021240234|   a|nexus4|nexus4_1|stand|
|    3|1424696633919|1424696631929117712|         0.016952515|        -0.003829956| -0.02017212|   a|nexus4|nexus4_1|stand|
|    4|1424696633928|1424696631934214148|           0.0158844|-0.00703430180000...| -0.02017212|   a|nexus4|nexus4_1|stand|
+-----+-

In [None]:
# Aplicação do schema e transformações previamente definidos e validados na etapa anterior

phones_gyr = spark.read.csv(caminho_02, schema=schema, header=True)

# Conversão de Arrival_Time (em milissegundos)
phones_gyr = phones_gyr.withColumn("Arrival_Time", (phones_gyr["Arrival_Time"] / 1000).cast("timestamp"))

# Conversão de Creation_Time (em nanossegundos)
phones_gyr = phones_gyr.withColumn("Creation_Time", (phones_gyr["Creation_Time"] / 1e9).cast("timestamp"))

In [None]:
# Dataframe formatado
phones_gyr.show(5)

+-----+--------------------+--------------------+-----------+-------------+------------+----+------+--------+-----+
|Index|        Arrival_Time|       Creation_Time|          x|            y|           z|User| Model|  Device|   gt|
+-----+--------------------+--------------------+-----------+-------------+------------+----+------+--------+-----+
|    0|2015-02-23 13:03:...|2015-02-23 13:03:...|0.013748169|-6.2561035E-4|-0.023376465|   a|nexus4|nexus4_1|stand|
|    1|2015-02-23 13:03:...|2015-02-23 13:03:...|0.014816284|-0.0016937256| -0.02230835|   a|nexus4|nexus4_1|stand|
|    2|2015-02-23 13:03:...|2015-02-23 13:03:...|  0.0158844|-0.0016937256|-0.021240234|   a|nexus4|nexus4_1|stand|
|    3|2015-02-23 13:03:...|2015-02-23 13:03:...|0.016952515| -0.003829956| -0.02017212|   a|nexus4|nexus4_1|stand|
|    4|2015-02-23 13:03:...|2015-02-23 13:03:...|  0.0158844|-0.0070343018| -0.02017212|   a|nexus4|nexus4_1|stand|
+-----+--------------------+--------------------+-----------+-----------

In [None]:
# Verificação do Schema estruturado
phones_gyr.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Arrival_Time: timestamp (nullable = true)
 |-- Creation_Time: timestamp (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)
 |-- User: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- gt: string (nullable = true)



###### Arquivo 03- Acelerômetro de relógios

In [None]:
caminho_03 = '/content/raw/activity_recognition_exp/Watch_accelerometer.csv'

# Dataframe inicial
watch_acc = spark.read.csv(caminho_03, header=True).show(5)

+-----+-------------+--------------+-----------+----------+-----------+----+-----+------+-----+
|Index| Arrival_Time| Creation_Time|          x|         y|          z|User|Model|Device|   gt|
+-----+-------------+--------------+-----------+----------+-----------+----+-----+------+-----+
|    0|1424696638740|27920678471000| -0.5650316| -9.572019|-0.61411273|   a| gear|gear_1|stand|
|    1|1424696638740|27920681910000|-0.83258367| -9.713276|-0.60693014|   a| gear|gear_1|stand|
|    2|1424696638740|27920692014000| -1.0181342| -9.935339|-0.54408234|   a| gear|gear_1|stand|
|    3|1424696638741|27920701983000| -1.2228385|-10.142437| -0.5662287|   a| gear|gear_1|stand|
|    4|1424696638741|27920711906000| -1.5771804|-10.480618|-0.40282443|   a| gear|gear_1|stand|
+-----+-------------+--------------+-----------+----------+-----------+----+-----+------+-----+
only showing top 5 rows



In [None]:
# Aplicação do schema e transformações previamente definidos e validados nas etapas anteriores

watch_acc = spark.read.csv(caminho_03, schema=schema, header=True)

# Conversão de Arrival_Time (em milissegundos)
watch_acc = watch_acc.withColumn("Arrival_Time", (watch_acc["Arrival_Time"] / 1000).cast("timestamp"))

# Conversão de Creation_Time (em nanossegundos)
watch_acc = watch_acc.withColumn("Creation_Time", (watch_acc["Creation_Time"] / 1e9).cast("timestamp"))

In [None]:
# Dataframe formatado
watch_acc.show(5)

+-----+--------------------+--------------------+-----------+----------+-----------+----+-----+------+-----+
|Index|        Arrival_Time|       Creation_Time|          x|         y|          z|User|Model|Device|   gt|
+-----+--------------------+--------------------+-----------+----------+-----------+----+-----+------+-----+
|    0|2015-02-23 13:03:...|1970-01-01 07:45:...| -0.5650316| -9.572019|-0.61411273|   a| gear|gear_1|stand|
|    1|2015-02-23 13:03:...|1970-01-01 07:45:...|-0.83258367| -9.713276|-0.60693014|   a| gear|gear_1|stand|
|    2|2015-02-23 13:03:...|1970-01-01 07:45:...| -1.0181342| -9.935339|-0.54408234|   a| gear|gear_1|stand|
|    3|2015-02-23 13:03:...|1970-01-01 07:45:...| -1.2228385|-10.142437| -0.5662287|   a| gear|gear_1|stand|
|    4|2015-02-23 13:03:...|1970-01-01 07:45:...| -1.5771804|-10.480618|-0.40282443|   a| gear|gear_1|stand|
+-----+--------------------+--------------------+-----------+----------+-----------+----+-----+------+-----+
only showing top 5 

In [None]:
# Verificação do Schema estruturado
watch_acc.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Arrival_Time: timestamp (nullable = true)
 |-- Creation_Time: timestamp (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)
 |-- User: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- gt: string (nullable = true)



###### Arquivo 04- Giroscópio de relógios

In [None]:
caminho_04 = '/content/raw/activity_recognition_exp/Watch_gyroscope.csv'

# Dataframe inicial
watch_gyr = spark.read.csv(caminho_04, header=True).show(5)

+-----+-------------+--------------+-----------+------------+------------+----+-----+------+-----+
|Index| Arrival_Time| Creation_Time|          x|           y|           z|User|Model|Device|   gt|
+-----+-------------+--------------+-----------+------------+------------+----+-----+------+-----+
|    0|1424696638743|27920678496000|-0.16218652|-0.022104237|  0.05965481|   a| gear|gear_1|stand|
|    1|1424696638743|27920681926000|-0.18322548| -0.06178534| 0.012516857|   a| gear|gear_1|stand|
|    2|1424696638743|27920692031000|-0.18082865| -0.10865697|-0.036485307|   a| gear|gear_1|stand|
|    3|1424696638743|27920701997000|-0.14780544| -0.15792546| -0.09853696|   a| gear|gear_1|stand|
|    7|1424696638744|27920743068000| 0.18216023| -0.32357407| -0.27723506|   a| gear|gear_1|stand|
+-----+-------------+--------------+-----------+------------+------------+----+-----+------+-----+
only showing top 5 rows



In [None]:
# Aplicação do schema e transformações previamente definidos e validados nas etapas anteriores

watch_gyr = spark.read.csv(caminho_02, schema=schema, header=True)

# Conversão de Arrival_Time (em milissegundos)
watch_gyr = watch_gyr.withColumn("Arrival_Time", (watch_gyr["Arrival_Time"] / 1000).cast("timestamp"))

# Conversão de Creation_Time (em nanossegundos)
watch_gyr = watch_gyr.withColumn("Creation_Time", (watch_gyr["Creation_Time"] / 1e9).cast("timestamp"))

In [None]:
# Dataframe formatado
watch_gyr.show(5)

+-----+--------------------+--------------------+-----------+-------------+------------+----+------+--------+-----+
|Index|        Arrival_Time|       Creation_Time|          x|            y|           z|User| Model|  Device|   gt|
+-----+--------------------+--------------------+-----------+-------------+------------+----+------+--------+-----+
|    0|2015-02-23 13:03:...|2015-02-23 13:03:...|0.013748169|-6.2561035E-4|-0.023376465|   a|nexus4|nexus4_1|stand|
|    1|2015-02-23 13:03:...|2015-02-23 13:03:...|0.014816284|-0.0016937256| -0.02230835|   a|nexus4|nexus4_1|stand|
|    2|2015-02-23 13:03:...|2015-02-23 13:03:...|  0.0158844|-0.0016937256|-0.021240234|   a|nexus4|nexus4_1|stand|
|    3|2015-02-23 13:03:...|2015-02-23 13:03:...|0.016952515| -0.003829956| -0.02017212|   a|nexus4|nexus4_1|stand|
|    4|2015-02-23 13:03:...|2015-02-23 13:03:...|  0.0158844|-0.0070343018| -0.02017212|   a|nexus4|nexus4_1|stand|
+-----+--------------------+--------------------+-----------+-----------

In [None]:
# Verificação do Schema estruturado
watch_gyr.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Arrival_Time: timestamp (nullable = true)
 |-- Creation_Time: timestamp (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)
 |-- User: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- gt: string (nullable = true)



##### **4. Escrita dos daframes em arquivos parquet**

Para preservar as transformações realizadas, e otimizar a análise posterior, os dados serão escrito no formato .parquet.

In [None]:
phones_acc.show(5)

+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|Index|        Arrival_Time|       Creation_Time|         x|         y|       z|User| Model|  Device|   gt|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|    0|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.958191| 0.6880646|8.135345|   a|nexus4|nexus4_1|stand|
|    1|2015-02-23 13:03:...|2015-02-23 13:03:...|  -5.95224| 0.6702118|8.136536|   a|nexus4|nexus4_1|stand|
|    2|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9950867| 0.6535492|8.204376|   a|nexus4|nexus4_1|stand|
|    3|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9427185| 0.6761627|8.128204|   a|nexus4|nexus4_1|stand|
|    4|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.991516|0.64164734|8.135345|   a|nexus4|nexus4_1|stand|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
only showing top 5 rows



In [None]:
# Escrita dos dataframes no formato .parquet e armazenamento na camada "bronze"

phones_acc.write.save("/content/bronze/phones_acc.parquet", format="parquet", mode="overwrite")
phones_gyr.write.save("/content/bronze/phones_gyr.parquet", format="parquet", mode="overwrite")
watch_acc.write.save("/content/bronze/watch_acc.parquet", format="parquet", mode="overwrite")
watch_gyr.write.save("/content/bronze/watch_gyr.parquet", format="parquet", mode="overwrite")


In [None]:
# Leitura de um arquivo .parquet, para verificar se as informações foram armazenadas corretamente na escrita

df = (
    spark
    .read
    .option('format', 'parquet')
    .option('header', 'true')
    .load('/content/bronze/phones_acc.parquet')
)

df.show(5)

+------+--------------------+--------------------+--------+--------+--------+----+----------+------------+----+
| Index|        Arrival_Time|       Creation_Time|       x|       y|       z|User|     Model|      Device|  gt|
+------+--------------------+--------------------+--------+--------+--------+----+----------+------------+----+
|107066|2015-02-23 13:34:...|1970-01-01 03:23:...|5.669399|0.153227|8.887166|   a|samsungold|samsungold_2|bike|
|107067|2015-02-23 13:34:...|1970-01-01 03:23:...|5.822626|0.306454|8.733939|   a|samsungold|samsungold_2|bike|
|107068|2015-02-23 13:34:...|1970-01-01 03:23:...|5.975853|0.153227|8.580712|   a|samsungold|samsungold_2|bike|
|107069|2015-02-23 13:34:...|1970-01-01 03:23:...| 6.12908|0.153227|8.733939|   a|samsungold|samsungold_2|bike|
|107070|2015-02-23 13:34:...|1970-01-01 03:23:...| 6.12908|0.153227|8.887166|   a|samsungold|samsungold_2|bike|
+------+--------------------+--------------------+--------+--------+--------+----+----------+-----------

In [None]:
# Leitura de todos os arquivos e conversão em dataframe, para uso nas etapas posteriores

phones_acc_parquet = (
    spark
    .read
    .option('format', 'parquet')
    .option('header', 'true')
    .load('/content/bronze/phones_acc.parquet')
)

phones_gyr_parquet = (
    spark
    .read
    .option('format', 'parquet')
    .option('header', 'true')
    .load('/content/bronze/phones_gyr.parquet')
)

watch_acc_parquet = (
    spark
    .read
    .option('format', 'parquet')
    .option('header', 'true')
    .load('/content/bronze/watch_acc.parquet')
)

watch_gyr_parquet = (
    spark
    .read
    .option('format', 'parquet')
    .option('header', 'true')
    .load('/content/bronze/watch_gyr.parquet')
)

##### **Resultado final etapa 1**

Dataframes PySpark com schemas formatados manualmente, com base na documentação.

In [None]:
# Acelerômetro de celulares
phones_acc.show(5)
# Giroscópio de celulares
phones_gyr.show(5)
# Acelerômetro de relógios
watch_acc.show(5)
# Giroscópio de relógios
watch_gyr.show(5)

+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|Index|        Arrival_Time|       Creation_Time|         x|         y|       z|User| Model|  Device|   gt|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
|    0|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.958191| 0.6880646|8.135345|   a|nexus4|nexus4_1|stand|
|    1|2015-02-23 13:03:...|2015-02-23 13:03:...|  -5.95224| 0.6702118|8.136536|   a|nexus4|nexus4_1|stand|
|    2|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9950867| 0.6535492|8.204376|   a|nexus4|nexus4_1|stand|
|    3|2015-02-23 13:03:...|2015-02-23 13:03:...|-5.9427185| 0.6761627|8.128204|   a|nexus4|nexus4_1|stand|
|    4|2015-02-23 13:03:...|2015-02-23 13:03:...| -5.991516|0.64164734|8.135345|   a|nexus4|nexus4_1|stand|
+-----+--------------------+--------------------+----------+----------+--------+----+------+--------+-----+
only showing top 5 rows

+--

#### Etapa 02
"Faça uma análise inicial dos dados: quais problemas você encontrou? Como você trataria
tais problemas?"


O conjunto de dados em questão, Heterogeneity Human Activity Recognition (HHAR), é um conjunto de dados projetado para benchmarking de algoritmos de reconhecimento de atividade humana. Os registros são compostos por leituras de sensores de movimento (acelerômetros e giroscópios) presentes em smartphones e smartwatches selecionados.

A partir da análise inicial dos dados, alguns problemas potenciais podem ser identificados:
- Compreensão do assunto e interpretação dos resultados;
- Volumetria dos dados;
- Dados faltantes;
- Qualidade dos dados;






##### Análises iniciais

In [None]:
# Volumetria - Quantidade de registros em cada dataframe

# Acelerômetro de celulares
vol1 = phones_acc_parquet.count()
print(f'Quantidade de linhas "Acelerômetro de celulares": {vol1}')

# Giroscópio de celulares
vol2 = phones_gyr_parquet.count()
print(f'Quantidade de linhas "Giroscópio de celulares": {vol2}')

# Acelerômetro de relógios
vol3 = watch_acc_parquet.count()
print(f'Quantidade de linhas "Acelerômetro de relógios": {vol3}')

# Giroscópio de relógios
vol4 = watch_gyr_parquet.count()
print(f'Quantidade de linhas "Giroscópio de relógios": {vol4}')

Quantidade de linhas "Acelerômetro de celulares": 13062475
Quantidade de linhas "Giroscópio de celulares": 13932632
Quantidade de linhas "Acelerômetro de relógios": 3540962
Quantidade de linhas "Giroscópio de relógios": 13932632


In [None]:
# Contagem de valores nulos por coluna

from pyspark.sql.functions import col, sum

# Acelerômetro de celulares
null_counts1 = phones_acc_parquet.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts1.show()
# Giroscópio de celulares
null_counts2 = phones_gyr_parquet.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts2.show()
# Acelerômetro de relógios
null_counts3 = watch_acc_parquet.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts3.show()
# Giroscópio de relógios
null_counts4 = watch_gyr_parquet.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts4.show()


+-----+------------+-------------+---+---+---+----+-----+------+---+
|Index|Arrival_Time|Creation_Time|  x|  y|  z|User|Model|Device| gt|
+-----+------------+-------------+---+---+---+----+-----+------+---+
|    0|           0|            0|  0|  0|  0|   0|    0|     0|  0|
+-----+------------+-------------+---+---+---+----+-----+------+---+

+-----+------------+-------------+---+---+---+----+-----+------+---+
|Index|Arrival_Time|Creation_Time|  x|  y|  z|User|Model|Device| gt|
+-----+------------+-------------+---+---+---+----+-----+------+---+
|    0|           0|            0|  0|  0|  0|   0|    0|     0|  0|
+-----+------------+-------------+---+---+---+----+-----+------+---+

+-----+------------+-------------+---+---+---+----+-----+------+---+
|Index|Arrival_Time|Creation_Time|  x|  y|  z|User|Model|Device| gt|
+-----+------------+-------------+---+---+---+----+-----+------+---+
|    0|           0|            0|  0|  0|  0|   0|    0|     0|  0|
+-----+------------+------------

#### Etapa 03

"Crie uma tabela com informações sumarizadas de cada usuário, como: quantos registros
tal usuário possui, quais modelos de aparelho cada usuário operou, etc."

Para executar essa etapa, serão utilizados como fonte os arquivos .parquet, para otimização do processamento.

Primeiro, será feita a leitura dos arquivos .parquet e criação de views temporárias, para utilização do Spark.SQL

In [None]:
# Passo 1. Criação das views temporárias de cada dataframe obtido na Etapa 1 - phones_acc, phones_gyr, watch_acc e watch_gyr

# (obs: como a leitura dos aquivos .parquet já foi feita na etapa 1, vou utilizar os df já criados para criar as views)

# Views dos dados de celulares
phones_acc_parquet.createOrReplaceTempView('view_phones_acc')
phones_gyr_parquet.createOrReplaceTempView('view_phones_gyr')

# Views dos dados de relógios
watch_acc_parquet.createOrReplaceTempView('view_watch_acc')
watch_gyr_parquet.createOrReplaceTempView('view_watch_gyr')

In [None]:
# Passo 2. Seleção das colunas que irão para a tabela unificada

# Selecionar apenas as colunas necessárias antes de unir os DataFrames
phones_acc_temp = phones_acc_parquet.select('Index', 'User', 'Model', 'Device', 'gt')
phones_gyr_temp = phones_acc_parquet.select('Index', 'User', 'Model', 'Device', 'gt')
watch_acc_temp = phones_acc_parquet.select('Index', 'User', 'Model', 'Device', 'gt')
watch_gyr_temp = phones_acc_parquet.select('Index', 'User', 'Model', 'Device', 'gt')

In [None]:
# Passo 3. Criação de uma coluna 'Source' para identificar a origem de cada DataFrame, preenchendo com os nomes especificados

from pyspark.sql.functions import lit

phones_acc_temp = phones_acc_temp.withColumn('Source', lit('phones_acc'))
phones_gyr_temp = phones_gyr_temp.withColumn('Source', lit('phones_gyr'))
watch_acc_temp = watch_acc_temp.withColumn('Source', lit('watch_acc'))
watch_gyr_temp = watch_gyr_temp.withColumn('Source', lit('watch_gyr'))

In [None]:
# Passo 4. Junção dos dados

user_database = phones_acc_temp.union(phones_gyr_temp).union(watch_acc_temp).union(watch_gyr_temp)

# Criação de tabela temporária para usar Spark SQL

user_database.createOrReplaceTempView('view_user_database')

In [None]:
user_database.count()

52249900

In [None]:
user_database.show(5)

+------+----+----------+------------+----+----------+
| Index|User|     Model|      Device|  gt|    Source|
+------+----+----------+------------+----+----------+
|107066|   a|samsungold|samsungold_2|bike|phones_acc|
|107067|   a|samsungold|samsungold_2|bike|phones_acc|
|107068|   a|samsungold|samsungold_2|bike|phones_acc|
|107069|   a|samsungold|samsungold_2|bike|phones_acc|
|107070|   a|samsungold|samsungold_2|bike|phones_acc|
+------+----+----------+------------+----+----------+
only showing top 5 rows



In [None]:
# Contagem do número total de usuários

from pyspark.sql.functions import countDistinct

total_users = user_database.select(countDistinct("User")).collect()[0][0]

print("Total de usuários:", total_users)

Total de usuários: 9


In [None]:
print(f'Base unificada:\n Usuários distintos: {total_users} \n Total de registros: {user_database.count()}')

Base unificada:
 Usuários distintos: 9 
 Total de registros: 52249900


In [None]:
# Passo 5. Consulta e agregações dos dados agrupados por usuário

# Agregação: Contagem de registros por usuário
user_records = spark.sql("""
    SELECT User, COUNT(*) AS Total_Registros
    FROM view_user_database
    GROUP BY User
    ORDER BY User
""")

# Agregação: Listagem dos modelos de aparelho por usuário e contagem de modelos distintos
user_device_models = spark.sql("""
    SELECT User, COLLECT_SET(Model) AS Device_Models, COUNT(DISTINCT Model) AS Total_Modelos
    FROM view_user_database
    GROUP BY User
    ORDER BY User
""")

In [None]:
# Exibição dos resultados
user_records.show()
user_device_models.show()

+----+---------------+
|User|Total_Registros|
+----+---------------+
|   a|        5450080|
|   b|        6195072|
|   c|        5307192|
|   d|        5348496|
|   e|        6459996|
|   f|        5538124|
|   g|        6350788|
|   h|        5369204|
|   i|        6230948|
+----+---------------+

+----+--------------------+-------------+
|User|       Device_Models|Total_Modelos|
+----+--------------------+-------------+
|   a|[nexus4, s3mini, ...|            4|
|   b|[nexus4, s3mini, ...|            4|
|   c|[nexus4, s3mini, ...|            4|
|   d|[nexus4, s3mini, ...|            4|
|   e|[nexus4, s3mini, ...|            4|
|   f|[nexus4, s3mini, ...|            4|
|   g|[nexus4, s3mini, ...|            4|
|   h|[nexus4, s3mini, ...|            4|
|   i|[nexus4, s3mini, ...|            4|
+----+--------------------+-------------+



In [None]:
spark.sql("SHOW TABLES").show()

+---------+------------------+-----------+
|namespace|         tableName|isTemporary|
+---------+------------------+-----------+
|         |   view_phones_acc|       true|
|         |   view_phones_gyr|       true|
|         |view_user_database|       true|
|         |    view_watch_acc|       true|
|         |    view_watch_gyr|       true|
+---------+------------------+-----------+

