##  Project Objective
The objective of this data engineering project is to build a scalable, end-to-end data pipeline that ingests, processes, and enriches Bitcoin’s daily market data. This pipeline will serve as the foundation for analytical reporting, time-series feature engineering, and predictive modeling related to Bitcoin's market behavior.

# Benefits
A Bitcoin investment advisory firm
- The advisory team can trust the data for research, reports, and investor recommendations without manual wrangling or risk of errors.
- Helps the firm detect trends, shifts, and patterns in the Bitcoin market — empowering analysts to make better buy/sell/hold calls.
- Enables risk analysts to flag abnormal behavior early, protecting clients and improving the firm's credibility.
- Enhances the firm’s client reporting and supports data-driven decision-making.
- Reduces operational overhead, freeing up analysts to focus on strategy instead of data prep.

##  Description
This notebook handles the ingestion of raw Bitcoin daily market data. It reads the dataset from a CSV source, infers schema, performs initial inspection, and writes the output to a raw (bronze) layer for further processing.

##  Tools & Technologies Used in This Notebook
| Tool / Library | Purpose |
|----------------|---------|
| **Apache Spark (PySpark)** | Distributed data processing for ingesting and handling large datasets |
| **Databricks / Jupyter Notebook** | Interactive environment for writing and testing code |
| **Delta Lake / Parquet** | Optimized storage format for raw (bronze) layer |
| **DBFS or Cloud Storage (e.g., `/mnt/raw/`)** | Storage location for input dataset |

In [0]:
#%fs ls /Volumes/bitcoin/market_data/coin/Bitcoin_history_data.csv    # List the contents of the specified directory in DBFS (Databricks File System)


path,name,size,modificationTime
dbfs:/Volumes/bitcoin/market_data/coin/Bitcoin_history_data.csv,Bitcoin_history_data.csv,335981,1753691038000


In [0]:
# Read the Bitcoin history CSV file into a Spark DataFrame
# - header=True: uses the first row as column names
# - inferSchema=True: automatically infers data types for each column
df_raw = spark.read.csv("/Volumes/bitcoin/market_data/coin/Bitcoin_history_data.csv", header=True, inferSchema=True)

# Display the schema of the DataFrame to understand the structure and data types
df_raw.printSchema()

# Show the first 5 rows of the DataFrame for a quick preview of the data
df_raw.show(5)

# Write the raw DataFrame to a Delta Lake table in the 'bronze' layer
# - format("delta"): saves the data in Delta format for ACID transactions and versioning
# - mode("overwrite"): replaces any existing data in the target table
# - saveAsTable(): registers the table in the metastore under the specified name
df_raw.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bitcoin.market_data.bronze_bitcoin")


In [0]:
import dlt
from pyspark.sql.functions import *

@dlt.table(
    name="bronze_bitcoin_historical",
    comment="Raw Bitcoin historical data ingested from CSV into the bronze layer"
)
def ingest_bitcoin_data():
    # Read the Bitcoin CSV file from DBFS
    df_raw = spark.read.csv(
        "/Volumes/bitcoin/market_data/coin/Bitcoin_history_data.csv",
        header=True,
        inferSchema=True
    )
    
    return df_raw


[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
File [0;32m<command-6740942322275571>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mimport[39;00m [38;5;21;01mdlt[39;00m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m [38;5;241m*[39m
[1;32m      4[0m [38;5;129m@dlt[39m[38;5;241m.[39mtable(
[1;32m      5[0m     name[38;5;241m=[39m[38;5;124m"[39m[38;5;124mbronze_bitcoin[39m[38;5;124m"[39m,
[1;32m      6[0m     comment[38;5;241m=[39m[38;5;124m"[39m[38;5;124mRaw Bitcoin historical data ingested from CSV into the bronze layer[39m[38;5;124m"[39m
[1;32m      7[0m )
[1;32m      8[0m [38;5;28;01mdef[39;00m [38;5;21mingest_bitcoin_data[39m():
[1;32m      9[0m     [38;5;66;03m# Read t