In [None]:
import requests
import json
import os
import datetime

In [13]:
### This python code can run on a Lambda which runs and pushes data into a DataLake/s3 at 't' interval

# For Local -> pushes into a folder

req_data = requests.get('https://fakerapi.it/api/v1/products?_quantity=1&_taxes=12&_categories_type=uuid')
data_json = req_data.json()
# data_json
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')

with open('raw_data_lake/file_'+timestamp+'.json', 'w') as f:
    json.dump(data_json, f)
    
    

In [15]:
files = os.listdir('raw_data_lake')
files


['file_2024-02-29_19:35:01.json',
 'file_2024-02-29_19:14:06.json',
 'file_2024-02-29_19:14:05.json',
 'file_2024-02-29_19:34:57.json',
 'file_2024-02-29_19:34:59.json',
 'file_2024-02-29_21:18:46.json']

In [8]:
# import duckdb
# cursor = duckdb.connect()
# duckdb.sql("SELECT * FROM read_json_auto('data.json')")


┌─────────┬───────┬───────┬────────────────────────────────────────────────────────────────────────────────────────────┐
│ status  │ code  │ total │                                            data                                            │
│ varchar │ int64 │ int64 │ struct(id bigint, "name" varchar, description varchar, ean varchar, upc varchar, image v…  │
├─────────┼───────┼───────┼────────────────────────────────────────────────────────────────────────────────────────────┤
│ OK      │   200 │     1 │ [{'id': 1, 'name': Aut earum sequi nam aut., 'description': Assumenda incidunt iste porr…  │
└─────────┴───────┴───────┴────────────────────────────────────────────────────────────────────────────────────────────┘

Loading Data into BRONZE Layer

In [14]:
#loads data into duckdb -> Local Solution

# We can load data into any database service like databricks and 
# load it into delta tables (Solves our requirements of getting data in any point of time)

import os
import duckdb
import pandas as pd

def read_json_files(directory):
    dfs = []
    
    files = os.listdir(directory)
    
    for file in files:
        if file.endswith(".json"):
            file_path = os.path.join(directory, file)
            
            # Read the JSON file into a DataFrame using DuckDB
            con = duckdb.connect(':memory:')
            con.execute(f'CREATE TABLE temp AS SELECT * FROM read_json_auto(\'{file_path}\')')
            df = con.execute('SELECT * FROM temp').fetchdf()
            con.close()
            
            dfs.append(df)
    
    combined_df = pd.concat(dfs, ignore_index=True)
    
    return combined_df

directory = './raw_data_lake'

# Read JSON files into a DataFrame
df = read_json_files(directory)

# Display the DataFrame
print(df)

  status  code  total                                               data
0     OK   200      1  [{'id': 1, 'name': 'Ex quia non est.', 'descri...
1     OK   200      1  [{'id': 1, 'name': 'Unde ab sit est minus.', '...
2     OK   200      1  [{'id': 1, 'name': 'Quis aliquam voluptatem eo...
3     OK   200      1  [{'id': 1, 'name': 'Totam illum quod quis mini...
4     OK   200      1  [{'id': 1, 'name': 'Eos animi dolor aperiam ip...
5     OK   200      1  [{'id': 1, 'name': 'Ipsa laborum dolore evenie...


Working data into SILVER Layer

In [None]:
# create spark dataframes to run large transformations in distributed systems
import pyspark.sql.functions as F

spark_df = spark.createDataFrame(df)

# We check for arrays in our data column field and create new rows for each element
# Explode the array column to create multiple rows
exploded_df = spark_df.select(col("id"), explode("data").alias("json_string"))

#This parses our json strings into a queryable format
ready_to_write_into_table_df = exploded_df.withColumn('json_data',F.from_json(df.value,MapType(StringType(),StringType())))

# Now we can build out our transformations/normalization/deduplication/features on top on this data

#Requirement Gathering and Pre- Processing

Setup Fault Tolerance and Setup Unit Tests for our Data as per requirements
- python udfs / pytest
- sqlmesh

For Orchestration->
- We can use Airflow/ Overwatch / Tools like Azure Data Factory
- Audit Logs management with services like Splunk

For Governance and Isolation of Catalogs/Lineage
- Unity Catalog / Microsoft Purview

Delta tables need maintenance as well since its a file system type database

- VACUUM is used to clean up unused and stale data files that are taking up unnecessary storage space. Removing these files can help reduce storage costs

    > When you run VACUUM on a Delta table it removes the following files from the underlying file system:
    > - Any data files that are not maintained by Delta Lake
    > - Removes stale data files (files that are no longer referenced by a Delta table) and are older than 7 days


- OPTIMIZE/ Z-Ordering -> Optionally optimize a subset of data or collocate data by column. If you do not specify collocation and the table is not defined with liquid clustering, bin-packing optimization is performed.

    > Bin-packing aims to produce evenly-balanced data files with respect to their size on disk, but not necessarily number of tuples per file.