# Step-by-Step PySpark Analysis

This notebook provides a guided example that loads the Parquet files from `batch_001` and `batch_002`, combining them into a single PySpark `DataFrame`.

## 1. Prerequisites
- Run `pip install -r requirements.txt` to ensure `pyspark` and `pyarrow` are available.
- Start `docker compose` to bring MinIO online in case you want to push data to object storage later.
- This notebook assumes the files live in the lesson directory `data_test`.

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName('Advanced-EDA-Step-By-Step')
    .config('spark.sql.session.timeZone', 'UTC')
    .getOrCreate()
)

spark

## 2. Defining working paths
Organize references for each batch directory.

In [None]:
from pathlib import Path

DATA_ROOT = Path('instructions') / 
BATCH_001 = DATA_ROOT / 'batch_001'
BATCH_002 = DATA_ROOT / 'batch_002'

print('Batch 1 ->', BATCH_001.resolve())
print('Batch 2 ->', BATCH_002.resolve())

Batch 1 -> C:\Users\Inteli\Documents\GitHub\20254si08\instructions\20251020_advanced_exploratory_data_analysis\data_test\batch_001
Batch 2 -> C:\Users\Inteli\Documents\GitHub\20254si08\instructions\20251020_advanced_exploratory_data_analysis\data_test\batch_002


## 3. Inspecting available tables
List the subfolders (each represents one Parquet table).

In [3]:
tables = sorted([p.name for p in BATCH_001.iterdir() if p.is_dir()])
tables

FileNotFoundError: [WinError 3] O sistema não pode encontrar o caminho especificado: 'instructions\\20251020_advanced_exploratory_data_analysis\\data_test\\batch_001'

## 4. Loading a specific table
Use `efentradas` as the running example: load each batch and then union the data.

In [None]:
table_name = 'efentradas'

batch1_df = spark.read.parquet(str(BATCH_001 / table_name))
batch2_df = spark.read.parquet(str(BATCH_002 / table_name))

combined_df = batch1_df.unionByName(batch2_df, allowMissingColumns=True)

combined_df.printSchema()
combined_df.show(5)

## 5. Utility function to combine tables
Build a helper that returns a merged `DataFrame` for any table.

In [None]:
from pyspark.sql import DataFrame

def load_table(table: str) -> DataFrame:
    paths = [BATCH_001 / table, BATCH_002 / table]
    dataframes = []
    for path in paths:
        if not path.exists():
            print(f'Aviso: {path} não encontrado; ignorando este lote.')
            continue
        dataframes.append(spark.read.parquet(str(path)))

    if not dataframes:
        raise FileNotFoundError(f'Nenhum dado encontrado para a tabela {table}.')

    merged = dataframes[0]
    for df in dataframes[1:]:
        merged = merged.unionByName(df, allowMissingColumns=True)
    return merged


## 6. Applying the helper across tables
Load a few tables and review counts to make sure the union worked.

In [None]:
summary = []
for table in ['efentradas', 'efsaidas', 'ctlancto']:
    df = load_table(table)
    summary.append((table, df.count()))

summary

## 7. Exploratory profiling for `ctcontas`
Focus on the `ctcontas` (chart of accounts) table to validate that both batches stitch together correctly and to capture quick business signals.

In [None]:
from pyspark.sql import functions as F

table = 'ctcontas'

batch1_ct = spark.read.parquet(str(BATCH_001 / table))
batch2_ct = spark.read.parquet(str(BATCH_002 / table))

print(f"Batch 001 rows: {batch1_ct.count()}")
print(f"Batch 002 rows: {batch2_ct.count()}")

ctcontas_df = load_table(table).cache()
print(f"Combined rows: {ctcontas_df.count()}")

ctcontas_df.agg(
    F.countDistinct('conta_contabil').alias('distinct_accounts'),
    F.countDistinct('descricao').alias('distinct_descriptions'),
    F.countDistinct('natureza').alias('distinct_natures'),
    F.min('nivel').alias('min_level'),
    F.max('nivel').alias('max_level')
).show()

In [None]:
(
    ctcontas_df
    .groupBy('natureza')
    .agg(
        F.count('*').alias('rows'),
        F.countDistinct('conta_contabil').alias('distinct_accounts')
    )
    .orderBy('natureza')
    .show()
)

(
    ctcontas_df
    .groupBy('nivel')
    .count()
    .orderBy('nivel')
    .show()
)

## 8. Visualizing distributions
This dataset is small enough to collect locally. Convert to Pandas and sketch quick plots. If `matplotlib` is not installed, run `pip install matplotlib`.

In [None]:
import matplotlib.pyplot as plt

ctcontas_pd = ctcontas_df.toPandas()

fig, axes = plt.subplots(1, 2, figsize=(12, 4))
ctcontas_pd['nivel'].value_counts().sort_index().plot(kind='bar', ax=axes[0], color='#4c78a8')
axes[0].set_title('Account level distribution')
axes[0].set_xlabel('Level')
axes[0].set_ylabel('Count')

ctcontas_pd['natureza'].value_counts().sort_values(ascending=True).plot(kind='barh', ax=axes[1], color='#f58518')
axes[1].set_title('Account nature distribution')
axes[1].set_xlabel('Count')
axes[1].set_ylabel('Nature')

plt.tight_layout()
plt.show()

In [None]:
ctcontas_df.orderBy('conta_contabil').show(10, truncate=False)

## 9. Saving or pushing to MinIO
After validating, persist the `DataFrames` back to Parquet, either locally or in MinIO via the `s3a://` protocol.

```python
(df.write
    .mode('overwrite')
    .format('parquet')
    .save('s3a://<bucket>/refined/efentradas'))
```


## 10. Stopping the Spark session

In [None]:
spark.stop()