In [3]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Configure Spark session
builder = SparkSession.builder \
    .appName("ReadBronzeTable") \
    .master("spark://spark-master:7077") \
    .config("spark.eventLog.enabled", "false") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars", "/opt/spark/jars/delta-spark_2.12-3.3.2.jar,/opt/spark/jars/delta-storage-3.3.2.jar,/opt/spark/jars/antlr4-runtime-4.9.3.jar")

# Apply Delta configuration
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Read Delta Table
spark.sql('SELECT 1 AS a').show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+
|  a|
+---+
|  1|
+---+



                                                                                

In [4]:
spark.stop()

In [None]:
from glob import glob

for x in glob('../opt/airflow/data/warehouse/*'):
    print(x)b

In [None]:
spark.stop()

In [None]:
spark.read.format('delta').load('../opt/airflow/data/warehouse/bronze.db/divvy_bikes').show()

In [None]:
def get_bash_command(path_name:str) -> str:
    bash_command = r"""
            # Retrieve parameters
            path="{{ params.path }}""" + path_name + r"""/"
            file_pattern="{{ params.file_pattern }}"
            n_days="{{ params.n_days }}"

            # Create path if it doesn't exist
            mkdir -p "${path}"

            # Generate timestamp
            timestamp=$(date +%Y_%m_%d_%H_%M_%S)

            # Create bkp subfolder if it doesn't exist
            bkp_dir="${path}bkp"
            mkdir -p "${bkp_dir}"

            # Debug: List files in path
            echo "Files in ${path}:"
            ls -l "${path}" || echo "No files found or path error"

            # Count matching files
            moved_count=$(find "${path}" -maxdepth 1 -type f -name "${file_pattern}" | wc -l)

            # Move all matching files to bkp, appending timestamp to filename
            find "${path}" -maxdepth 1 -type f -name "${file_pattern}" -exec sh -c '
                base=$(basename "$0")
                mv "$0" "${1}/${base}_${2}"
            ' {} "${bkp_dir}" "${timestamp}" \;

            # Log moved files
            if [ ${moved_count} -gt 0 ]; then
                echo "${moved_count} files moved to ${bkp_dir} successfully."
            else
                echo "No files matching ${file_pattern} found to move."
            fi

            # Delete files in bkp older than n_days
            deleted_count=$(find "${bkp_dir}" -type f -mtime +${n_days} | wc -l)
            find "${bkp_dir}" -type f -mtime +${n_days} -delete

            # Log deleted files
            if [ ${deleted_count} -gt 0 ]; then
                echo "${deleted_count} files older than ${n_days} days in ${bkp_dir} deleted successfully."
            else
                echo "No files older than ${n_days} days found in ${bkp_dir}."
            fi
            """
    return bash_command


DivvyBikesPaths = {
    'move_files_free_bike_status'    : get_bash_command(path_name='free_bike_status'),
    'move_files_station_information' : get_bash_command(path_name='station_information'),
    'move_files_station_status'      : get_bash_command(path_name='station_status'),
    'move_files_system_pricing_plan' : get_bash_command(path_name='system_pricing_plan'),
    'move_files_vehicle_types'       : get_bash_command(path_name='vehicle_types)')
}

print(DivvyBikesPaths.get('move_files_free_bike_status'))

In [None]:
rdd = spark.sparkContext.parallelize(data.get('data').get('stations'))

In [None]:
spark.read.option('inferSchema', False).json(rdd).show()