### 🧱 Step 1: Initialize SparkSession

We start by creating the Spark session with Delta support enabled to process and store the crypto data.


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Transform and Save Delta") \
    .getOrCreate()


25/05/09 20:54:54 WARN Utils: Your hostname, codespaces-78c943 resolves to a loopback address: 127.0.0.1; using 10.0.11.16 instead (on interface eth0)
25/05/09 20:54:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/09 20:54:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/09 20:54:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### 🧬 Step 2: Convert Pandas to Spark DataFrame

We load the pandas DataFrame using the API function and convert it into a distributed Spark DataFrame for further processing.


In [2]:
import sys
sys.path.append("../src")

from api_utils import get_market_data

btc_df = get_market_data("bitcoin", "usd", 30)

# Convert to Spark
btc_spark_df = spark.createDataFrame(btc_df)

btc_spark_df.printSchema()
btc_spark_df.show(5)


root
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)



                                                                                

+--------------------+-----------------+
|           timestamp|            price|
+--------------------+-----------------+
|2025-04-09 21:05:...| 83004.7027605787|
|2025-04-09 22:04:...|83315.43959137528|
|2025-04-09 23:04:...|83058.34775924393|
|2025-04-10 00:04:...|82595.42668033838|
|2025-04-10 01:04:...|82182.12470713933|
+--------------------+-----------------+
only showing top 5 rows



### 🧹 Step 3: Data Cleaning and Type Casting

We ensure that columns have appropriate types for analysis and storage in Delta format.

In [3]:
from pyspark.sql.functions import col

btc_spark_df_clean = btc_spark_df.select(
    col("timestamp").cast("timestamp"),
    col("price").cast("double")
)

btc_spark_df_clean.printSchema()
btc_spark_df_clean.show(5)


root
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)

+--------------------+-----------------+
|           timestamp|            price|
+--------------------+-----------------+
|2025-04-09 21:05:...| 83004.7027605787|
|2025-04-09 22:04:...|83315.43959137528|
|2025-04-09 23:04:...|83058.34775924393|
|2025-04-10 00:04:...|82595.42668033838|
|2025-04-10 01:04:...|82182.12470713933|
+--------------------+-----------------+
only showing top 5 rows



### 💾 Step 4: Save Cleaned Data to Delta Lake

Finally, we store the transformed DataFrame in the Delta format within Microsoft Fabric's default lakehouse location.


In [None]:
btc_spark_df_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/lakehouse/default/crypto/bitcoin")


Py4JJavaError: An error occurred while calling o58.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


25/05/09 20:55:10 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
