In [0]:
%pip install koboextractor 

In [0]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from koboextractor import KoboExtractor
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Kobo credentials and API setup
api_token = '64deabcd2830d55dafc9a765cf2453c41fde07fc'  # Replace with secure storage in production
form_id = 'aKhrYSrKMa6Qtfgzvfjnhn'
kobo_base_url = 'https://kobo.humanitarianresponse.info/api/v2'

# Initialize KoboExtractor
kobo = KoboExtractor(api_token, kobo_base_url)

# Function to fetch KoboToolbox data and return as a Spark DataFrame
def fetch_kobo_to_spark(api_token, form_id, base_url):
    try:
        # Fetch data from KoboToolbox API
        data = kobo.get_data(form_id)

        if "results" not in data:
            raise KeyError("'results' key not found in KoboToolbox API response.")

        # Flatten the JSON response
        df = pd.json_normalize(data["results"])

        # Optional: Convert timestamp fields to datetime if present
        if 'submission_time' in df.columns:
            df['submission_time'] = pd.to_datetime(df['submission_time'])

        # Convert pandas DataFrame to Spark DataFrame
        spark_df = spark.createDataFrame(df)

        return spark_df
    except Exception as e:
        raise RuntimeError(f"Failed to fetch or process data: {e}")

# Fetch data and convert to Spark DataFrame
spark_df = fetch_kobo_to_spark(api_token, form_id, kobo_base_url)

# Fix NullType arrays by casting to known types for Delta compatibility
spark_df = spark_df.withColumn(
    "_attachments", col("_attachments").cast(ArrayType(StringType()))
).withColumn(
    "_geolocation", col("_geolocation").cast(ArrayType(DoubleType()))
).withColumn(
    "_tags", col("_tags").cast(ArrayType(StringType()))
).withColumn(
    "_notes", col("_notes").cast(ArrayType(StringType()))
)

# Write to Delta table
spark_df.write.mode("overwrite").saveAsTable("kobo_submission")

# Optionally display a preview
display(spark_df.limit(10))

In [0]:
import os
import pandas as pd
import time
from pyspark.sql import SparkSession
from koboextractor import KoboExtractor
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Kobo credentials and API setup
api_token = '64deabcd2830d55dafc9a765cf2453c41fde07fc'  # Replace with secure storage in production
form_id = 'aKhrYSrKMa6Qtfgzvfjnhn'
kobo_base_url = 'https://kobo.humanitarianresponse.info/api/v2'

# Initialize KoboExtractor
kobo = KoboExtractor(api_token, kobo_base_url)

# Function to fetch KoboToolbox data and return as a Spark DataFrame
def fetch_kobo_to_spark(api_token, form_id, base_url):
    try:
        # Fetch data from KoboToolbox API
        data = kobo.get_data(form_id)

        if "results" not in data:
            raise KeyError("'results' key not found in KoboToolbox API response.")

        # Flatten the JSON response
        df = pd.json_normalize(data["results"])

        # Optional: Convert timestamp fields to datetime if present
        if 'submission_time' in df.columns:
            df['submission_time'] = pd.to_datetime(df['submission_time'])

        # Convert pandas DataFrame to Spark DataFrame
        spark_df = spark.createDataFrame(df)

        return spark_df
    except Exception as e:
        raise RuntimeError(f"Failed to fetch or process data: {e}")

# Function to continuously ingest data
def continuous_data_ingestion(api_token, form_id, base_url, interval=300):
    while True:
        try:
            # Fetch data and convert to Spark DataFrame
            spark_df = fetch_kobo_to_spark(api_token, form_id, base_url)

            # Fix NullType arrays by casting to known types for Delta compatibility
            spark_df = spark_df.withColumn(
                "_attachments", col("_attachments").cast(ArrayType(StringType()))
            ).withColumn(
                "_geolocation", col("_geolocation").cast(ArrayType(DoubleType()))
            ).withColumn(
                "_tags", col("_tags").cast(ArrayType(StringType()))
            ).withColumn(
                "_notes", col("_notes").cast(ArrayType(StringType()))
            )

            # Write to Delta table
            spark_df.write.mode("append").saveAsTable("kobo_submission")

            # Optional: Display a preview of the latest data
            display(spark_df.limit(10))

        except Exception as e:
            print(f"Error during ingestion: {e}")

        # Wait for the next iteration
        time.sleep(interval)  # Wait for 'interval' seconds before fetching data again

# Start the continuous data ingestion process (set to fetch every 5 minutes)
continuous_data_ingestion(api_token, form_id, kobo_base_url, interval=300)  # interval in seconds


In [0]:
# Silver table 


# Silver Layer: Refined Data (data cleaning and transformation)
silver_df = spark_df.select(
    'submission_time',
    'latitude',
    'longitude',
    'deviceid',
    'formhub/uuid',
    'meta/instanceID',
    'formhub/submitter_id',
    'location',
    'other_fields'  # Add specific fields you need
)

# Handle missing or malformed data
silver_df = silver_df.na.drop(subset=['latitude', 'longitude'])

# Correcting the column name for submission time
silver_df = silver_df.withColumn(
    "_submission_time", 
    col("submission_time").cast("timestamp")
)

# Optional: Enrich the data (example: adding a new column based on logic)
silver_df = silver_df.withColumn(
    "is_valid_location", 
    (col("latitude").isNotNull() & col("longitude").isNotNull()).cast("boolean")
)

# Now you can proceed with further transformations or saving to Delta
silver_df.write.format("delta").mode("overwrite").saveAsTable("kobo_silver")


In [0]:
# Gold Table

# Gold Layer: Aggregated or Analytical Data (e.g., daily submissions, region counts)

# Example: Count submissions per day
gold_df = silver_df.groupBy(
    "submission_time", 
    "location"  # Replace with specific region or other grouping fields
).agg(
    count("*").alias("total_submissions"),
    avg("latitude").alias("avg_latitude"),
    avg("longitude").alias("avg_longitude")
)

# Example: Filter for regions with more than 100 submissions
gold_df = gold_df.filter(col("total_submissions") > 100)

# Write to Delta table for Gold Layer
gold_df.write.format("delta").mode("overwrite").saveAsTable("kobo_gold")
