In [142]:
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, flatten, explode, struct, create_map, array
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, TimestampType


In [143]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType

In [113]:
spark = SparkSession.builder.appName('chap_4').master("local[*]").getOrCreate()

In [144]:
from faker import Faker
import pandas as pd
fake = Faker()
def generate_data(num):
    row = [{"name":fake.name(),
           "address":fake.address(),
           "city":fake.city(),
           "state":fake.state(),
           "purchase_date":fake.date_time(),
            "purchase_id":fake.pyfloat(),
             "sales":fake.pyfloat()
           }]
    return row
panda = pd.DataFrame(generate_data(2))
fake_data = spark.createDataFrame(panda)
fake_data.write.format("parquet").mode("append").save("/Users/saisundarmasetty/Documents/data_architect_ws/chap3_lab_data/")

In [145]:
# Create a bronze table from the dataset. The data should be streaming but set up to trigger once.
location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap3_lab_data/"
format = "parquet"
schema = spark.read.format(format).load(location).schema
users = spark.readStream.schema(schema).format(format).load(location)

In [146]:
users.printSchema()

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- purchase_date: timestamp (nullable = true)
 |-- purchase_id: double (nullable = true)
 |-- sales: double (nullable = true)



In [147]:
bronze_schema = users.schema
bronze_location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap4_lab_bronze/"
bronze_format = "parquet"
checkpoint_location = f"{bronze_location}/_checkpoint"
output_mode = "append"
bronze_query = users.writeStream.format(bronze_format).trigger(once=True).option("checkpointLocation", checkpoint_location).option("path", bronze_location).outputMode(output_mode).start()


25/07/02 19:24:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [148]:
spark.read.format(bronze_format).schema(users.schema).load(bronze_location).show()


+----------------+--------------------+---------------+----------+--------------------+-------------------+-------------------+
|            name|             address|           city|     state|       purchase_date|        purchase_id|              sales|
+----------------+--------------------+---------------+----------+--------------------+-------------------+-------------------+
|   Sylvia Obrien|88043 William Mou...|  New Jaimestad|      Utah|1998-10-23 06:14:...| 3.4507044394328E13|2.53867269249356E11|
|Katherine Fuller|5694 Reilly Mount...|Christopherside|Washington|1994-03-17 10:37:...|-8.38804796858555E9|   480.211623671276|
|    David Nelson|4837 Wright Stati...| Christineburgh|   Vermont|1998-01-23 01:24:...|  -8152.74306742652| 8.2259285471756E12|
+----------------+--------------------+---------------+----------+--------------------+-------------------+-------------------+



our silver table, we are going add the following columns:

full_address: A combination of the address, city, and state
Id: Round up/down purchase_id and find the absolute value
Create and use a UDF to create a first_name column.

In [149]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit, struct, concat, col, abs, floor

In [150]:
bronze_location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap4_lab_bronze/"
schema = spark.read.format("parquet").load(bronze_location).schema
user_bronze = spark.readStream.format("parquet").schema(schema).load(bronze_location)

In [151]:
#creating the UDF function which deals with the datamanipulation
@udf(returnType = StringType())
def strip_name(x):
    return x.split()[0]

In [152]:
#Full address column , rounding up
address_columns = ["address","city","state"]
clean = user_bronze.select(col("*"),concat(*address_columns).alias("full_address"),floor(abs("purchase_id")).alias("id"),strip_name("name").alias("first_name"))

In [153]:
silver_location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap4_lab_silver/"
silver_checkpoint = f"{silver_location}/_checkpoint"
format = "parquet"
output_mode = "append"
clean.writeStream.format(format).option("checkpointLocation","silver_checkpoint").option("path","silver_location").trigger(once=True).outputMode(output_mode).start()

25/07/02 19:24:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x128add130>

In [154]:
spark.read.format("parquet").load("silver_location").show(truncate = False)

+----------------+---------------------------------------------------------+---------------+----------+--------------------------+-------------------+-------------------+--------------------------------------------------------------------------+--------------+----------+
|name            |address                                                  |city           |state     |purchase_date             |purchase_id        |sales              |full_address                                                              |id            |first_name|
+----------------+---------------------------------------------------------+---------------+----------+--------------------------+-------------------+-------------------+--------------------------------------------------------------------------+--------------+----------+
|Sylvia Obrien   |88043 William Mountain Suite 486\nEast Lynnland, LA 91295|New Jaimestad  |Utah      |1998-10-23 06:14:58.553893|3.4507044394328E13 |2.53867269249356E11|88043 William 

Gold Table : MAX, MIN sales and states


In [155]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit, struct, sum,avg,max, min
silver_location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap4_lab_bronze/"
schema = spark.read.format("parquet").load(silver_location).schema
users_silver = spark.readStream.format("parquet").schema(schema).load(silver_location)


In [156]:
pip install delta-spark

Note: you may need to restart the kernel to use updated packages.


In [157]:
from pyspark.sql import SparkSession
from delta import *

# Create a SparkSession with Delta Lake support
builder = SparkSession.builder.appName("DeltaApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("Spark and Delta Lake are configured.")

Spark and Delta Lake are configured.


25/07/02 19:24:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [160]:

gold_agg = users_silver.groupBy("state").agg(min("sales").alias("minimum_sales"),max("sales").alias("maximum_sales"),avg("sales").alias("avg_sales"))
gold_location = "/Users/saisundarmasetty/Documents/data_architect_ws/chap4_lab_gold/"
gold_checkpoint_location = f"{gold_location}/_checkpoint"
format = "delta"
gold_agg.writeStream.format(format).option("checkpointLocation",gold_checkpoint_location).trigger(once=True).option("path",gold_location).outputMode("append").start()


Py4JJavaError: An error occurred while calling o1742.start.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Make sure the provider name is correct and the package is properly registered and compatible with your Spark version. SQLSTATE: 42K02
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:681)
	at org.apache.spark.sql.classic.DataStreamWriter.startInternal(DataStreamWriter.scala:247)
	at org.apache.spark.sql.classic.DataStreamWriter.start(DataStreamWriter.scala:136)
	at jdk.internal.reflect.GeneratedMethodAccessor105.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$6(DataSource.scala:665)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:665)
	at scala.util.Failure.orElse(Try.scala:230)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:665)
	... 13 more


In [None]:
spark.read.format(format).load(gold_location).show()