<a href="https://colab.research.google.com/github/2moonyo/2moonyo/blob/main/Tumi_Modiba_pyspark_exercise_V5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark homework assignment

## Context

The goal of this assignment is to get view on your coding workflow & style.  Your main focus should be creating performant & robust code for data manipulations.  

For a homework assignment, we cannot grant you access to our infrastructure (Cloudera data platform on prem: a spark cluster deployment on Yarn).  Since the focus is on development, we provided a template notebook to get up and running very quickly on Google Colab.  

You have the freedom to perform this assignment on any spark3+ infrastructure.  If want to use a local or cloud setup, go for it!

Some of the tasks are open for interpretation.  This allows us to assess business understanding and relevant field experience.  These tasks are not pass or fail checks.  During the interview we'll ask details about the choice(s) you made.

For the assignment, you'll be working with store location data.  You might be familiar with the phrase "Location, location, location" from the real-estate context.  The same house can have a different selling price based on the location.  In fast moving consumer goods (FMCG), location is one of the most crucial aspects:

* Proximity & accessibility to customers increases convenience
* Proximity to competitors increases market pressure
* It has impact on the supply chain

## Evaluation criteria

1. Software engineering
   1. Clean code (e.g. using meaningful names)
   1. Robust & efficient code
   1. Styling (e.g. PEP8, or Google style guide)
   1. Documentation(e.g. docstrings)
   1. Design (e.g. SOLID principles)
1. Workflow
   1. How you use Git
   1. How you structure your assignment
   1. Owning mistakes
   1. Rationale for design decisions
   1. Making your solution accessible to others
1. Business context
   1. GDPR
   1. Fast moving consumer goods
1.(optional: own infra) System engineering
   1. What setup did you use?
   1. How did you set it up?

## Deliverables we expect

1. Private GitHub repo
   1. Colab allows you to save to GitHub
   1. Invite my username to your private repo as contributor
1. README.md with relevant content
1. Code relevant to the assignment


## Google colab spark setup

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
from os import environ
import findspark

In [3]:
# Setting environment variables
environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [4]:
# Init spark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
# spark.sql.repl.eagerEval.enabled: Property used to format output tables better

spark = (
    SparkSession
    .builder
    .appName("cg-pyspark-assignment")
    .master("local")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
  )

spark

## Getting the assignment data

This will call the api and save the results in current working directory as .json files

In [6]:
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places > clp-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places > okay-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places > spar-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places > dats-places.json
!curl https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts > cogo-colpnts.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  224k    0  224k    0     0   154k      0 --:--:--  0:00:01 --:--:--  154k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  144k    0  144k    0     0   149k      0 --:--:-- --:--:-- --:--:--  149k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  167k    0  167k    0     0   132k      0 --:--:--  0:00:01 --:--:--  132k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 88519    0 88519    0     0   116k      0 --:--:-- --:--:-- --:--:--  116k
  % Total    % Received % Xferd  Average Speed   Tim

## Assignment instructions

1. Download the data from api
1. Create a logger object that logs to a file "assignment.log"
   1. You can add whatever logging config you want or need
   1. At least on Filehandler based on instructions
1. implement get_data_by_brand function
   1. Follow instructions in docstring
   1. df_clp code line should work
1. No more handholding ... :-)
1. Create a single object (dataframe) that:
   1. Contains data from **all brands**
      1. Not every brand has the same columns!
   1. Drop placeSearchOpeningHours
   1. You can keep sellingPartners as an array
   1. Extract "postal_code" from address
   1. Create new column "province" derived from postal_code
   1. Transform geoCoordinates into lat and lon column
   1. One-hot-encode the handoverServices
   1. Pretend houseNumber and streetName are GDPR sensitive.
      1. How would you anonymize this data for unauthorized users?
      1. (optional) Implement the above
      1. How would you show the real data to authorized users?
      1. (optional) Implement the above
1. Save the end result as a parquet file
   1. (optional)partitioning?

**postal_code** logic:
* "Brussel": 1000-1299  
* "Waals-Brabant": 1300-1499  
* "Vlaams-Brabant": 1500-1999, 3000-3499  
* "Antwerpen": 2000-2999  
* "Limburg": 3500-3999  
* "Luik": 4000-4999  
* "Namen": 5000-5999  
* "Henegouwen": 6000-6599,7000-7999  
* "Luxemburg": 6600-6999  
* "West-Vlaanderen": 8000-8999  
* "Oost-Vlaanderen": 9000-9999

In [7]:
%%bash

# Initialize bash for directory formatting


# Create a data folder
mkdir -p /content/data

# Move all JSON files to data folder
mv /content/*.json /content/data/

# Force remove sample_data
rm -rf /content/sample_data/

In [8]:
# Package installer for accessing .env
!pip install -q python-dotenv

In [9]:
# Write a .env file with the SECRET_KEY value
# This is where you create the .env which you will share privately
with open(".env", "w") as f:
    f.write("SECRET_KEY=fairplay2025\n")

In [10]:
from dotenv import load_dotenv
load_dotenv()  # Load environment variables from .env

import os
SECRET_KEY = os.getenv("SECRET_KEY")

# Optional: confirm it loaded (remove after testing)
print("Loaded SECRET_KEY:", SECRET_KEY)

Loaded SECRET_KEY: fairplay2025


In [11]:
# Import statements should go here

import os
import json
import logging
from typing import Optional
from getpass import getpass
from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from logging import getLogger, Logger
from pyspark.sql.types import StructType, ArrayType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, lit, broadcast, explode, create_map, to_json, from_json, array_contains, when








In [13]:
# Checks for file installation
!cat .env


SECRET_KEY=fairplay2025


In [14]:
load_dotenv()

True

In [15]:
# LOGGING SETUP (conforms to NDJSON standard)

LOG_FILE = "assignment.log"
LOGGER = logging.getLogger("data_loader")
# Attach file handler if not already present
if not LOGGER.handlers:
    file_handler = logging.FileHandler(LOG_FILE, mode='a')
    formatter = logging.Formatter('%(message)s')
    file_handler.setFormatter(formatter)
    LOGGER.setLevel(logging.INFO)
    LOGGER.addHandler(file_handler)
    LOGGER.propagate = False

In [16]:
# GLOBAL CONSTANTS & DATA STRUCTURES

# Directory with data folder for json path
BRAND_PATHS = {
    "clp": "/content/data/clp-places.json",
    "okay": "/content/data/okay-places.json",
    "spar": "/content/data/spar-places.json",
    "dats": "/content/data/dats-places.json",
    "cogo": "/content/data/cogo-colpnts.json"
}

# Global dictionary for individual dataframes
  #  - Holds final Brand dataframe
BRAND_DATAFRAMES = {}

# Postcode ranges list mapping to provinces
province_ranges = [
    ("Brussel", 1000, 1299),
    ("Waals-Brabant", 1300, 1499),
    ("Vlaams-Brabant", 1500, 1999),
    ("Vlaams-Brabant", 3000, 3499),
    ("Antwerpen", 2000, 2999),
    ("Limburg", 3500, 3999),
    ("Luik", 4000, 4999),
    ("Namen", 5000, 5999),
    ("Henegouwen", 6000, 6599),
    ("Henegouwen", 7000, 7999),
    ("Luxemburg", 6600, 6999),
    ("West-Vlaanderen", 8000, 8999),
    ("Oost-Vlaanderen", 9000, 9999),
]

# Define Schema and datatype for province_df join
schema = StructType([
    StructField("province", StringType(), True),
    StructField("start_pc", IntegerType(), True),
    StructField("end_pc", IntegerType(), True),
])

# Create province_df dataframe with schema
province_df = spark.createDataFrame(province_ranges, schema)

In [17]:
def log_dataframe_summary(df, label: str = "DataFrame", logger: Optional[Logger] = LOGGER):
    """
    Logs details of a DataFrame: row/column count, column names, data types, and missing values.

    """

    try:

        # Count rows & columns
        row_count = df.count()
        col_count = len(df.columns)
        col_names = df.columns
        dtypes = df.dtypes  # List of (columnName, dataType)

        # Count non-null entries per column
        non_null_counts = (
            df.select([col(c).isNotNull().cast("int").alias(c) for c in df.columns])
              .groupBy().sum()
              .collect()[0]
              .asDict()
        )

        # Log aggregations
        logger.info(json.dumps({
            "event": "dataframe_summary",
            "label": label,
            "row_count": row_count,
            "column_count": col_count,
            "column_names": col_names,
            "data_types": dtypes,
            "non_null_counts": non_null_counts,
            "timestamp": datetime.now().isoformat()
        }))

    except Exception as e:
        logger.exception(json.dumps({
            "event": "summary_failure",
            "label": label,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }))

In [18]:



def check_access_and_mask(df: DataFrame, logger: Logger = LOGGER) -> DataFrame:
    """
    Prompts for a key. If incorrect, masks sensitive fields (house number and street name).
    """

    user_key = getpass("🔐 Enter access key to view full address: ")

    # Passkey conditional
    if user_key != SECRET_KEY:
        masked_df = (
            df.withColumn("address_streetName", lit("xxx"))
              .withColumn("address_houseNumber", lit("xxx"))
        )

        logger.info(json.dumps({
            "event": "address_masked",
            "status": "unauthorized",
            "timestamp": datetime.now().isoformat()
        }))
        return masked_df

    else:
        logger.info(json.dumps({
            "event": "address_revealed",
            "status": "authorized",
            "timestamp": datetime.now().isoformat()
        }))
        return df

In [19]:
def get_data_by_brand(brand: str, logger: Optional[Logger] = LOGGER) -> DataFrame:
    """
    Loads a JSON file for a specific brand, logs schema, flattens it, and applies masking.
    """
    global BRAND_DATAFRAMES
    brand = brand.lower()

    if brand not in BRAND_PATHS:
        logger.error(json.dumps({
            "event": "unsupported_brand",
            "brand": brand,
            "message": "Unsupported brand provided",
            "timestamp": datetime.now().isoformat()
        }))
        raise ValueError(f"Unsupported brand: {brand}")

    path = BRAND_PATHS[brand]
    if not os.path.exists(path):
        logger.error(json.dumps({
            "event": "file_not_found",
            "brand": brand,
            "path": path,
            "timestamp": datetime.now().isoformat()
        }))
        raise FileNotFoundError(f"No data found at: {path}")

    try:
        # Load JSON as-is
        df = spark.read.option("multiline", "true").json(path)

        # Print raw nested schema
        print(f"\n🔍 Raw schema for brand '{brand}':")
        df.printSchema()

        # Log row/column counts
        row_count = df.count()
        if row_count == 0:
            logger.warning(json.dumps({
                "event": "empty_data_warning",
                "brand": brand,
                "path": path,
                "message": "No records found.",
                "timestamp": datetime.now().isoformat()
            }))
        else:
            logger.info(json.dumps({
                "event": "load_success",
                "brand": brand,
                "path": path,
                "row_count": row_count,
                "column_count": len(df.columns),
                "columns": df.columns,
                "schema": df.schema.simpleString(),
                "timestamp": datetime.now().isoformat()
            }))

        # Flatten after printing original schema
        df_flat = flatten_df(df, logger=logger)


        # Expose global variable like df_clp, df_spar, etc.
        globals()[f"df_{brand}"] = df_flat
        BRAND_DATAFRAMES[brand] = df_flat

        return df_flat

    except Exception as e:
        logger.exception(json.dumps({
            "event": "load_or_flatten_failure",
            "brand": brand,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }))
        raise


In [20]:
def get_flattened_fields(schema, prefix=""):
    """
    Recursively flattens a JSON schema for nested dictionaries.
    """
    fields = []

    for field in schema.fields:
        name = field.name
        dtype = field.dataType
        full_name = f"{prefix}.{name}" if prefix else name
        alias_name = full_name.replace(".", "_")

        if isinstance(dtype, StructType):
            fields += get_flattened_fields(dtype, full_name)
        elif isinstance(dtype, ArrayType):
            fields.append(to_json(col(full_name)).alias(alias_name))
        else:
            fields.append(col(full_name).alias(alias_name))

    return fields

In [21]:
def flatten_df(df, logger: Optional[Logger] = LOGGER) -> DataFrame:
    """
    Applies flattening to a DataFrame with nested structures. Logs before and after.
    """

    logger.info(json.dumps({
        "event": "flatten_start",
        "message": "Flattening DataFrame schema",
        "original_columns": df.columns,
        "timestamp": datetime.now().isoformat()
    }))

    try:
        flattened = df.select(*get_flattened_fields(df.schema))
        logger.info(json.dumps({
            "event": "flatten_success",
            "message": "Flattened DataFrame",
            "flattened_columns": flattened.columns,
            "timestamp": datetime.now().isoformat()
        }))
        return flattened
    except Exception as e:
        logger.exception(json.dumps({
            "event": "flatten_error",
            "message": "Failed to flatten DataFrame",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }))
        raise


In [22]:
def align_to_reference(df, brand_name, ref_cols, all_cols):
    """
    Aligns a DataFrame to the reference schema for union operations. Adds missing columns with nulls.
    """

    for col_name in all_cols:
        if col_name not in df.columns:
            df = df.withColumn(col_name, lit(None))

    # Add or overwrite Brand column
    df = df.withColumn("Brand", lit(brand_name))

    # Set to check for duplicates
    seen = set()
    final_columns = ["Brand"]  # Always start with Brand
    for col_name in list(ref_cols) + list(all_cols):
        if col_name != "Brand" and col_name not in seen:
            final_columns.append(col_name)
            seen.add(col_name)

    return df.select(*final_columns)


In [23]:
def union_all_brands(dataframes: dict, logger: Logger = LOGGER) -> DataFrame:
    """
    Combines all brand DataFrames into a single one.
    """
    widest_df_entry = max(dataframes.items(), key=lambda item: len(item[1].columns))
    reference_columns = widest_df_entry[1].columns
    all_columns = set().union(*[set(df.columns) for df in dataframes.values()])

    aligned = [
        align_to_reference(df.withColumn("Brand", lit(brand)), brand, reference_columns, all_columns)
        for brand, df in dataframes.items()
    ]

    df_all = aligned[0]
    for df in aligned[1:]:
        df_all = df_all.unionByName(df, allowMissingColumns=True)

    if "placeSearchOpeningHours" in df_all.columns:
        df_all = df_all.drop("placeSearchOpeningHours")

    logger.info(json.dumps({
        "event": "union_complete",
        "row_count": df_all.count(),
        "column_count": len(df_all.columns),
        "timestamp": datetime.now().isoformat()
    }))

    return df_all

In [24]:
def enrich_with_province(df: DataFrame, province_df: DataFrame, logger: Logger = LOGGER) -> DataFrame:
    """
    Joins a DataFrame with province information using postcode ranges.
    """

    df_with_province = df.join(
        broadcast(province_df),
        (df["address_postalcode"] >= province_df["start_pc"]) &
        (df["address_postalcode"] <= province_df["end_pc"]),
        how="left"
    ).drop("start_pc", "end_pc")

    log_dataframe_summary(df_with_province.limit(100), label="Sample with Provinces", logger=logger)
    return df_with_province


In [25]:
def expand_handover_services(df: DataFrame, logger: Logger = LOGGER) -> DataFrame:
    """
    Expands handoverServices into binary flags.
    """

    df = df.withColumn(
        "handoverServices_array",
        from_json("handoverServices", ArrayType(StringType()))
    )

    distinct_services = (
        df.select(explode("handoverServices_array").alias("service"))
        .distinct()
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    for service in distinct_services:
        df = df.withColumn(
            f"handover_{service}",
            array_contains(col("handoverServices_array"), service).cast("int")
        )

    df = df.drop("handoverServices_array")
    return df


In [26]:
def view_logs(path: str = LOG_FILE):
    """
    Reads and returns the contents of NDJSON log file.
    """
    return spark.read.json(path)


In [29]:
def main():
    """
    Runs the full pipeline: loads, flattens, unions, enriches, masks, logs and returns output with address masking.
    """
    logger = LOGGER  # Ensure logger is initialized

    # Load and flatten each brand, and assign globals & dictionary
    for brand in BRAND_PATHS:
        try:
            get_data_by_brand(brand, logger=logger)
        except Exception:
            continue

    if not BRAND_DATAFRAMES:
        logger.error(json.dumps({
            "event": "no_dataframes_loaded",
            "message": "All brand files failed to load or flatten.",
            "timestamp": datetime.now().isoformat()
        }))
        raise RuntimeError("No dataframes were successfully loaded.")

    # Log summaries of flat dataframes
    for brand, df in BRAND_DATAFRAMES.items():
        log_dataframe_summary(df, label=f"{brand} DataFrame", logger=logger)

    # Align all DataFrames to widest schema and union
    df_all_brands = union_all_brands(BRAND_DATAFRAMES, logger=logger)
    log_dataframe_summary(df_all_brands, label="Combined Brands DataFrame", logger=logger)

    # Union with province from postal code
    df_with_province = enrich_with_province(df_all_brands, province_df, logger=logger)

    # One-hot encode Handover_services
    df_final = expand_handover_services(df_with_province, logger=logger)

    # Embedd GDPR masking
    df_final = check_access_and_mask(df_final, logger=logger)

    # View logs
    logs_df = view_logs(path=LOG_FILE)

    # Step 7: Return logs & Dataframe
    return df_final, logs_df


In [30]:
# Trigger Main and display output
'''
You will need to input .env here to pass authentification
'''
df_final, logs_df = main()
df_final.show(5, truncate=False)



🔍 Raw schema for brand 'clp':
root
 |-- address: struct (nullable = true)
 |    |-- cityName: string (nullable = true)
 |    |-- countryCode: string (nullable = true)
 |    |-- countryName: string (nullable = true)
 |    |-- houseNumber: string (nullable = true)
 |    |-- postalcode: string (nullable = true)
 |    |-- streetName: string (nullable = true)
 |-- branchId: string (nullable = true)
 |-- commercialName: string (nullable = true)
 |-- ensign: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- geoCoordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- handoverServices: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- isActive: boolean (nullable = true)
 |-- moreInfoUrl: string (nullable = true)
 |-- placeId: long (nullable = true)
 |-- placeSearchOpeningHours: array (nullable = true)
 |    |-- element: struct (contai

In [31]:
# Write parquet in partitions per Brand
df_final.write.mode("overwrite").partitionBy("Brand").parquet("/content/brand_output")


In [None]:
# Load all Parquet files from output
df_parquet = spark.read.parquet("/content/brand_output")

# Show data sample
df_parquet.show(5, truncate=False)

+----------------+-------------------+-------------------+-------------------+------------------+------------------+--------+---------------------------+---------+---------------+-----------------------+------------------------+----------------------------------------------------------------+--------+------------------------------------------------------------------------------------------+-------+------------+------------------+------------------------------+-------------------------------------------------------------+---------------------------+------------+-----------------+---------------+-----------------------+-----------------------+-----------------------------+----------------------------+-----------------------------+-----+
|address_cityName|address_countryCode|address_countryName|address_houseNumber|address_postalcode|address_streetName|branchId|commercialName             |ensign_id|ensign_name    |geoCoordinates_latitude|geoCoordinates_longitude|handoverServices           

In [None]:
# Check individual brand dataframes
df_dats.show(5, truncate=False)

+----------------+-------------------+-------------------+-------------------+------------------+---------------------+--------+---------------------------+---------+-----------+-----------------------+------------------------+--------+------------------------------------------------------------------------+-------+------------+------------------+------------------------------+------------------------------------------------+------------+-----------------+
|address_cityName|address_countryCode|address_countryName|address_houseNumber|address_postalcode|address_streetName   |branchId|commercialName             |ensign_id|ensign_name|geoCoordinates_latitude|geoCoordinates_longitude|isActive|moreInfoUrl                                                             |placeId|placeType_id|placeType_longName|placeType_placeTypeDescription|routeUrl                                        |sourceStatus|temporaryClosures|
+----------------+-------------------+-------------------+-------------------+

In [None]:
import shutil
'''
====NB!! Only use if parquet files are corrupted====
'''
# Remove old corrupted Parquet output
#shutil.rmtree("/content/brand_output", ignore_errors=True)
shutil.rmtree("/content/data", ignore_errors=True)

In [32]:
# Create a .gitignore file
with open(".gitignore", "w") as f:
    f.write(".env\n")


[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /content/.git/
[master (root-commit) c214b2d] Initial commit
 1575 files changed, 361776 insertions(+)
 create mode 100644 .config/.last_opt_in_prompt.yaml
 create mode 100644 .config/.last_survey_prompt.yaml
 create mode 100644 .config/.last_update_check.json
 create mode 100644 .config/active_config
 create mode 100644 .config/config_sentinel
 create mode 100644 .config/configurations/config_default
 create mode 100644 .config/default_configs.db
 create mode 100644 .config