In [1]:
# Specify additional jars for Spark jobs
from pyspark.sql import SparkSession

spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3'
]

spark = SparkSession.builder \
    .appName("Dataframe using a JDBC Connection") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [6]:
kafka_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "myConsumerGroup")\
    .load()

kafka_df.printSchema()
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+-----------+--------------------+
|        key|               value|
+-----------+--------------------+
|17179869184|{"InvoiceNo":"498...|
|51539607552|{"InvoiceNo":"518...|
|77309411328|{"InvoiceNo":"532...|
|34359738368|{"InvoiceNo":"508...|
|68719476736|{"InvoiceNo":"527...|
|85899345920|{"InvoiceNo":"536...|
|77309411329|{"InvoiceNo":"532...|
|34359738369|{"InvoiceNo":"508...|
|85899345921|{"InvoiceNo":"536...|
|25769803779|{"InvoiceNo":"503...|
|17179869186|{"InvoiceNo":"498...|
|60129542147|{"InvoiceNo":"523...|
|25769803780|{"InvoiceNo":"503...|
|          4|{"InvoiceNo":"489...|
|68719476739|{"InvoiceNo":"527...|
|77309411331|{"InvoiceNo":"532...|
|34359738372|{"InvoiceNo":"508...|
|17179869187|{"In

In [2]:
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("../data/employee.csv")

retail_df.show()

+-------------+----+-------+------+----------+-----+-----------------+
|         name| age| income|gender|department|grade|performance_score|
+-------------+----+-------+------+----------+-----+-----------------+
|  Allen Smith|45.0|    NaN|   NaN|Operations|   G3|              723|
|      S Kumar| NaN|16000.0|     F|   Finance|   G0|              520|
|  Jack Morgan|32.0|35000.0|     M|   Finance|   G2|              674|
|    Ying Chin|45.0|65000.0|     F|     Sales|   G3|              556|
|Dheeraj Patel|30.0|42000.0|     F|Operations|   G2|              711|
|Satyam Sharma| NaN|62000.0|   NaN|     Sales|   G3|              649|
| James Authur|54.0|    NaN|     F|Operations|   G3|               53|
|   Josh Wills|54.0|52000.0|     F|   Finance|   G3|              901|
|     Leo Duck|23.0|98000.0|     M|     Sales|   G4|              709|
+-------------+----+-------+------+----------+-----+-----------------+



In [3]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [4]:
df_mysql = df_mysql.orderBy(df_mysql.lname)

# Displays the dataframe and some of its metadata
df_mysql.show(5)
df_mysql.printSchema()

+--------+--------------+--------------------+
|   lname|         fname|                name|
+--------+--------------+--------------------+
|   Karau|        Holden|       Karau, Holden|
|Maréchal|Pierre Sylvain|Maréchal, Pierre ...|
|  Pascal|        Blaise|      Pascal, Blaise|
|  Perrin|  Jean-Georges|Perrin, Jean-Georges|
|Voltaire|      François|  Voltaire, François|
+--------+--------------+--------------------+
only showing top 5 rows

root
 |-- lname: string (nullable = true)
 |-- fname: string (nullable = true)
 |-- name: string (nullable = true)



In [5]:
# PostgresSQL JDBC Connection
df_postgres = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [6]:
df_postgres = df_postgres.orderBy(df_postgres.lname)

# Displays the dataframe and some of its metadata
df_postgres.show(5)
df_postgres.printSchema()

+--------+--------------+--------------------+
|   lname|         fname|                name|
+--------+--------------+--------------------+
|   Karau|        Holden|       Karau, Holden|
|Maréchal|Pierre Sylvain|Maréchal, Pierre ...|
|  Pascal|        Blaise|      Pascal, Blaise|
|  Perrin|  Jean-Georges|Perrin, Jean-Georges|
|Voltaire|      François|  Voltaire, François|
+--------+--------------+--------------------+
only showing top 5 rows

root
 |-- lname: string (nullable = true)
 |-- fname: string (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "tweet") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [8]:
df_mysql.printSchema()

root
 |-- tweet_json: string (nullable = true)
 |-- format: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- actor_display_name: string (nullable = true)
 |-- actor_followers_count: integer (nullable = true)
 |-- actor_friends_count: integer (nullable = true)
 |-- actor_id: string (nullable = true)
 |-- actor_status_count: integer (nullable = true)
 |-- actor_preferred_username: string (nullable = true)
 |-- actor_image_url: string (nullable = true)
 |-- actor_posted_time: timestamp (nullable = true)
 |-- verb: string (nullable = true)
 |-- body: string (nullable = true)
 |-- hashtag_list: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- is_fake_tweet: integer (nullable = true)
 |-- in_reply_to: string (nullable = true)
 |-- posted_time: timestamp (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- generator_display_name: string (nullable = true)
 |-- contains_mention: integer (nullable = 

In [11]:
raw_df = spark.read.format("image") \
    .load('../data/images/')

In [12]:
raw_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [13]:
image_df = raw_df.select("image.origin", "image.height","image.width", "image.nChannels", "image.mode", "image.data")

In [20]:
# Read CSV file
csv_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [21]:
csv_df.count()

525461

In [15]:
retail_df = csv_df.selectExpr("Invoice as InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "Price as UnitPrice", "`Customer ID` as CustomerID", "Country")
retail_df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   489434|    85048|15CM CHRISTMAS GL...|      12|01/12/09 07:45|     6.95|     13085|United Kingdom|
|   489434|   79323P|  PINK CHERRY LIGHTS|      12|01/12/09 07:45|     6.75|     13085|United Kingdom|
|   489434|   79323W| WHITE CHERRY LIGHTS|      12|01/12/09 07:45|     6.75|     13085|United Kingdom|
|   489434|    22041|"RECORD FRAME 7""...|      48|01/12/09 07:45|      2.1|     13085|United Kingdom|
|   489434|    21232|STRAWBERRY CERAMI...|      24|01/12/09 07:45|     1.25|     13085|United Kingdom|
|   489434|    22064|PINK DOUGHNUT TRI...|      24|01/12/09 07:45|     1.65|     13085|United Kingdom|
|   489434|    21871| SAVE THE PLANET MUG|      24|01/12/09 07:45|     1.

In [16]:
# Convert data record ro JSON message
from pyspark.sql.functions import to_json, struct, from_json, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType

kafka_df = retail_df.withColumn("key", monotonically_increasing_id().cast("STRING")).withColumn("value", to_json(struct([retail_df[x] for x in retail_df.columns])).cast("STRING"))

jsonSchema = StructType([ StructField("eventName", StringType(), True), StructField("eventParams", StringType(), True)])
kafka_df.select("key", "value").show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  0|{"InvoiceNo":"489...|
|  1|{"InvoiceNo":"489...|
|  2|{"InvoiceNo":"489...|
|  3|{"InvoiceNo":"489...|
|  4|{"InvoiceNo":"489...|
|  5|{"InvoiceNo":"489...|
|  6|{"InvoiceNo":"489...|
|  7|{"InvoiceNo":"489...|
|  8|{"InvoiceNo":"489...|
|  9|{"InvoiceNo":"489...|
| 10|{"InvoiceNo":"489...|
| 11|{"InvoiceNo":"489...|
| 12|{"InvoiceNo":"489...|
| 13|{"InvoiceNo":"489...|
| 14|{"InvoiceNo":"489...|
| 15|{"InvoiceNo":"489...|
| 16|{"InvoiceNo":"489...|
| 17|{"InvoiceNo":"489...|
| 18|{"InvoiceNo":"489...|
| 19|{"InvoiceNo":"489...|
+---+--------------------+
only showing top 20 rows



In [17]:
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "topic_data") \
    .save()

In [7]:
# Data source
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [8]:
# Save to MySQl JDBC Connection
retail_df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "online_retail_II") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .save()

In [9]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail.csv")

In [10]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail/")

In [11]:
retail_df.write.mode("overwrite") \
    .parquet("../data/retail-parquet/")

In [12]:
retail_df.write.mode("overwrite") \
    .orc("../data/retail-orc/")

In [13]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, from_json, to_date

eventSchema = StructType()\
    .add('InvoiceNo', StringType())\
    .add('StockCode', StringType())\
    .add('Description', StringType())\
    .add('Quantity', IntegerType())\
    .add('InvoiceDate', StringType())\
    .add('UnitPrice', DoubleType())\
    .add('CustomerID', IntegerType())\
    .add('Country', StringType())

retail_df = kafka_df.select(from_json(col("value"). cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime")).select("message.*", "EventTime")

In [15]:
retail_df.printSchema()
retail_df.show(5)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|           EventTime|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------------+
|   498903|    22384|LUNCH BAG PINK RE...|      10|23/02/10 14:16|     1.65|     14112|United Kingdom|2023-03-08 02:13:...|
|   518776|    82600|NO SINGING METAL ...|      12|11/08/10 15:49|      2.1|     15110|United Kingdom|2023-03-08 02:13:...

In [21]:
df1 = kafka_df.select("value").show()
df2 = spark.read.json(df1)
df2.show()


+--------------------+
|               value|
+--------------------+
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
|[7B 22 49 6E 76 6...|
+--------------------+
only showing top 20 rows



TypeError: path can be only string, list or RDD

In [25]:
Kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "streamConsumerGroup") \
    .load()

In [26]:
Kafka_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
query = Kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()#%%
# Specify additional jars for Spark jobs
from pyspark.sql import SparkSession

spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3'
]

spark = SparkSession.builder \
    .appName("Dataframe using a JDBC Connection") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [None]:
kafka_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "myConsumerGroup")\
    .load()

kafka_df.printSchema()
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

In [None]:
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("../data/employee.csv")

retail_df.show()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql = df_mysql.orderBy(df_mysql.lname)

# Displays the dataframe and some of its metadata
df_mysql.show(5)
df_mysql.printSchema()

In [None]:
# PostgresSQL JDBC Connection
df_postgres = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_postgres = df_postgres.orderBy(df_postgres.lname)

# Displays the dataframe and some of its metadata
df_postgres.show(5)
df_postgres.printSchema()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "tweet") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql.printSchema()

In [None]:
raw_df = spark.read.format("image") \
    .load('../data/images/')

In [None]:
raw_df.printSchema()

In [None]:
image_df = raw_df.select("image.origin", "image.height","image.width", "image.nChannels", "image.mode", "image.data")

In [None]:
# Read CSV file
csv_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
csv_df.count()

In [None]:
retail_df = csv_df.selectExpr("Invoice as InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "Price as UnitPrice", "`Customer ID` as CustomerID", "Country")
retail_df.show()

In [None]:
# Convert data record ro JSON message
from pyspark.sql.functions import to_json, struct, from_json, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType

kafka_df = retail_df.withColumn("key", monotonically_increasing_id().cast("STRING")).withColumn("value", to_json(struct([retail_df[x] for x in retail_df.columns])).cast("STRING"))

jsonSchema = StructType([ StructField("eventName", StringType(), True), StructField("eventParams", StringType(), True)])
kafka_df.select("key", "value").show()

In [None]:
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "topic_data") \
    .save()

In [None]:
# Data source
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
# Save to MySQl JDBC Connection
retail_df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "online_retail_II") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .save()

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail.csv")

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail/")

In [None]:
retail_df.write.mode("overwrite") \
    .parquet("../data/retail-parquet/")

In [None]:
retail_df.write.mode("overwrite") \
    .orc("../data/retail-orc/")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, from_json, to_date

eventSchema = StructType()\
    .add('InvoiceNo', StringType())\
    .add('StockCode', StringType())\
    .add('Description', StringType())\
    .add('Quantity', IntegerType())\
    .add('InvoiceDate', StringType())\
    .add('UnitPrice', DoubleType())\
    .add('CustomerID', IntegerType())\
    .add('Country', StringType())

retail_df = kafka_df.select(from_json(col("value"). cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime")).select("message.*", "EventTime")

In [None]:
retail_df.printSchema()
retail_df.show(5)

In [None]:
df1 = kafka_df.select("value").show()
df2 = spark.read.json(df1)
df2.show()


In [None]:
Kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "streamConsumerGroup") \
    .load()

In [None]:
Kafka_stream_df.printSchema()

In [None]:
query = Kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("memory") \
    .outputMode("append") \
    .queryName("testk1") \
    .start()#%%
# Specify additional jars for Spark jobs
from pyspark.sql import SparkSession

spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3'
]

spark = SparkSession.builder \
    .appName("Dataframe using a JDBC Connection") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [None]:
kafka_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "myConsumerGroup")\
    .load()

kafka_df.printSchema()
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

In [None]:
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("../data/employee.csv")

retail_df.show()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql = df_mysql.orderBy(df_mysql.lname)

# Displays the dataframe and some of its metadata
df_mysql.show(5)
df_mysql.printSchema()

In [None]:
# PostgresSQL JDBC Connection
df_postgres = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_postgres = df_postgres.orderBy(df_postgres.lname)

# Displays the dataframe and some of its metadata
df_postgres.show(5)
df_postgres.printSchema()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "tweet") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql.printSchema()

In [None]:
raw_df = spark.read.format("image") \
    .load('../data/images/')

In [None]:
raw_df.printSchema()

In [None]:
image_df = raw_df.select("image.origin", "image.height","image.width", "image.nChannels", "image.mode", "image.data")

In [None]:
# Read CSV file
csv_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
csv_df.count()

In [None]:
retail_df = csv_df.selectExpr("Invoice as InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "Price as UnitPrice", "`Customer ID` as CustomerID", "Country")
retail_df.show()

In [None]:
# Convert data record ro JSON message
from pyspark.sql.functions import to_json, struct, from_json, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType

kafka_df = retail_df.withColumn("key", monotonically_increasing_id().cast("STRING")).withColumn("value", to_json(struct([retail_df[x] for x in retail_df.columns])).cast("STRING"))

jsonSchema = StructType([ StructField("eventName", StringType(), True), StructField("eventParams", StringType(), True)])
kafka_df.select("key", "value").show()

In [None]:
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "topic_data") \
    .save()

In [None]:
# Data source
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
# Save to MySQl JDBC Connection
retail_df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "online_retail_II") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .save()

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail.csv")

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail/")

In [None]:
retail_df.write.mode("overwrite") \
    .parquet("../data/retail-parquet/")

In [None]:
retail_df.write.mode("overwrite") \
    .orc("../data/retail-orc/")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, from_json, to_date

eventSchema = StructType()\
    .add('InvoiceNo', StringType())\
    .add('StockCode', StringType())\
    .add('Description', StringType())\
    .add('Quantity', IntegerType())\
    .add('InvoiceDate', StringType())\
    .add('UnitPrice', DoubleType())\
    .add('CustomerID', IntegerType())\
    .add('Country', StringType())

retail_df = kafka_df.select(from_json(col("value"). cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime")).select("message.*", "EventTime")

In [None]:
retail_df.printSchema()
retail_df.show(5)

In [None]:
df1 = kafka_df.select("value").show()
df2 = spark.read.json(df1)
df2.show()


In [None]:
Kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "streamConsumerGroup") \
    .load()

In [None]:
Kafka_stream_df.printSchema()

In [None]:
query = Kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()#%%
# Specify additional jars for Spark jobs
from pyspark.sql import SparkSession

spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3'
]

spark = SparkSession.builder \
    .appName("Dataframe using a JDBC Connection") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [None]:
kafka_df = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "myConsumerGroup")\
    .load()

kafka_df.printSchema()
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

In [None]:
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("../data/employee.csv")

retail_df.show()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql = df_mysql.orderBy(df_mysql.lname)

# Displays the dataframe and some of its metadata
df_mysql.show(5)
df_mysql.printSchema()

In [None]:
# PostgresSQL JDBC Connection
df_postgres = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/spark_labs") \
    .option("dbtable", "ch02") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_postgres = df_postgres.orderBy(df_postgres.lname)

# Displays the dataframe and some of its metadata
df_postgres.show(5)
df_postgres.printSchema()

In [None]:
# MySQl JDBC Connection
df_mysql = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "tweet") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .load()

In [None]:
df_mysql.printSchema()

In [None]:
raw_df = spark.read.format("image") \
    .load('../data/images/')

In [None]:
raw_df.printSchema()

In [None]:
image_df = raw_df.select("image.origin", "image.height","image.width", "image.nChannels", "image.mode", "image.data")

In [None]:
# Read CSV file
csv_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
csv_df.count()

In [None]:
retail_df = csv_df.selectExpr("Invoice as InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "Price as UnitPrice", "`Customer ID` as CustomerID", "Country")
retail_df.show()

In [None]:
# Convert data record ro JSON message
from pyspark.sql.functions import to_json, struct, from_json, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType

kafka_df = retail_df.withColumn("key", monotonically_increasing_id().cast("STRING")).withColumn("value", to_json(struct([retail_df[x] for x in retail_df.columns])).cast("STRING"))

jsonSchema = StructType([ StructField("eventName", StringType(), True), StructField("eventParams", StringType(), True)])
kafka_df.select("key", "value").show()

In [66]:
kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "topic_data") \
    .save()

In [None]:
# Data source
retail_df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("sep", ",") \
    .load("../data/online_retail_II.csv")

In [None]:
# Save to MySQl JDBC Connection
retail_df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbase") \
    .option("dbtable", "online_retail_II") \
    .option("user", "dbuser") \
    .option("password", "dbuser") \
    .option("useSSL", "false") \
    .save()

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail.csv")

In [None]:
retail_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("../data/retail/")

In [None]:
retail_df.write.mode("overwrite") \
    .parquet("../data/retail-parquet/")

In [None]:
retail_df.write.mode("overwrite") \
    .orc("../data/retail-orc/")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, from_json, to_date

eventSchema = StructType()\
    .add('InvoiceNo', StringType())\
    .add('StockCode', StringType())\
    .add('Description', StringType())\
    .add('Quantity', IntegerType())\
    .add('InvoiceDate', StringType())\
    .add('UnitPrice', DoubleType())\
    .add('CustomerID', IntegerType())\
    .add('Country', StringType())

retail_df = kafka_df.select(from_json(col("value"). cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime")).select("message.*", "EventTime")

In [None]:
retail_df.printSchema()
retail_df.show(5)

In [None]:
df1 = kafka_df.select("value").show()
df2 = spark.read.json(df1)
df2.show()


In [83]:
Kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "streamConsumerGroup") \
    .load()

In [42]:
|query = Kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("memory") \
    .outputMode("append") \
    .queryName("testk1") \
    .start()

In [43]:
query.awaitTermination(10)

False

In [44]:
query.stop()

In [46]:
query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [52]:
# Query data
query_result=spark.sql("SELECT * from testk1 LIMIT 10")
query_result.show(10)

+-----------+--------------------+
|        key|               value|
+-----------+--------------------+
|25769803776|{"InvoiceNo":"503...|
|          0|{"InvoiceNo":"489...|
|60129542144|{"InvoiceNo":"523...|
| 8589934592|{"InvoiceNo":"494...|
|42949672961|{"InvoiceNo":"513...|
|25769803777|{"InvoiceNo":"503...|
|17179869185|{"InvoiceNo":"498...|
|68719476737|{"InvoiceNo":"527...|
| 8589934593|{"InvoiceNo":"494...|
|25769803778|{"InvoiceNo":"503...|
+-----------+--------------------+



In [51]:
spark.sql("SELECT COUNT(*) AS cnt FROM testk1").show()

+------+
|   cnt|
+------+
|525461|
+------+



In [53]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@7e4a9e25, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$4993/1319113060@323da47d
+- *(1) Project [cast(key#511 as string) AS key#834, cast(value#512 as string) AS value#835]
   +- MicroBatchScan[key#511, value#512, topic#513, partition#514, offset#515L, timestamp#516, timestampType#517] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan




In [54]:
query.recentProgress

[{'id': 'eda1097a-edfc-4b40-aa11-1bcaa5267bd2',
  'runId': 'cfa7b9e7-69f8-4252-ab39-77df0c3a5cbb',
  'name': 'testk1',
  'timestamp': '2023-03-08T11:34:58.003Z',
  'batchId': 0,
  'numInputRows': 525461,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 96414.8623853211,
  'durationMs': {'addBatch': 5170,
   'getBatch': 1,
   'latestOffset': 125,
   'queryPlanning': 7,
   'triggerExecution': 5450,
   'walCommit': 74},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[topic_data]]',
    'startOffset': None,
    'endOffset': {'topic_data': {'2': 175458, '1': 175334, '0': 174669}},
    'latestOffset': {'topic_data': {'2': 175458, '1': 175334, '0': 174669}},
    'numInputRows': 525461,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 96414.8623853211,
    'metrics': {'avgOffsetsBehindLatest': '0.0',
     'maxOffsetsBehindLatest': '0',
     'minOffsetsBehindLatest': '0'}}],
  'sink': {'description': 'MemorySink', 'numOutputRows': 525461}},
 {'id': '

In [55]:
query.lastProgress

{'id': 'eda1097a-edfc-4b40-aa11-1bcaa5267bd2',
 'runId': 'cfa7b9e7-69f8-4252-ab39-77df0c3a5cbb',
 'name': 'testk1',
 'timestamp': '2023-03-08T11:35:23.463Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1, 'triggerExecution': 1},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[topic_data]]',
   'startOffset': {'topic_data': {'2': 175458, '1': 175334, '0': 174669}},
   'endOffset': {'topic_data': {'2': 175458, '1': 175334, '0': 174669}},
   'latestOffset': {'topic_data': {'2': 175458, '1': 175334, '0': 174669}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0,
   'metrics': {'avgOffsetsBehindLatest': '0.0',
    'maxOffsetsBehindLatest': '0',
    'minOffsetsBehindLatest': '0'}}],
 'sink': {'description': 'MemorySink', 'numOutputRows': 0}}

In [56]:
spark.streams.active

[]

In [57]:
spark.streams.awaitAnyTermination()

StreamingQueryException: Query [id = 63f4bebd-e656-4ca8-9341-4e5c50523d32, runId = dd6ca762-342d-43ec-af7a-a5fb2f012e6e] terminated with exception: Set(topic_data-2) are gone. Kafka option 'kafka.group.id' has been set on this query, it is
 not recommended to set this option. This option is unsafe to use since multiple concurrent
 queries or sources using the same group id will interfere with each other as they are part
 of the same consumer group. Restarted queries may also suffer interference from the
 previous run having the same group id. The user should have only one query per group id,
 and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
 consumers from the previous query are marked dead by the Kafka group coordinator before the
 restarted query starts running.
    . 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    

In [74]:
query = Kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .queryName("task1") \
    .option("partition.assignment.strategy", "range") \
    .trigger(availableNow=True) \
    .start()

In [75]:
query.awaitTermination()

In [78]:
spark.sql("SELECT COUNT(*) AS cnt FROM task1").show()

+------+
|   cnt|
+------+
|525461|
+------+



In [77]:
query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

Loading data incrementally

In [2]:
from pyspark.sql import SparkSession
spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3'
]

spark = SparkSession.builder \
    .appName("Dataframe using a JDBC Connection") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [3]:
kafka_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic_data") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "incConsumerGroup") \
    .load()

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, from_json, to_date

eventSchema = StructType() \
    .add('InvoiceNo', StringType()) \
    .add('StockCode', StringType()) \
    .add('Description', StringType()) \
    .add('Quantity', IntegerType()) \
    .add('InvoiceDate', StringType()) \
    .add('UnitPrice', DoubleType()) \
    .add('CustomerID', IntegerType()) \
    .add('Country', StringType())

retail_df = kafka_df.select(from_json(col("value"). cast(StringType()), eventSchema).alias("message"), col("timestamp").alias("EventTime")).select("message.*", "EventTime")

In [11]:
base_path = "../data-lake/retail_events-partition"
retail_df.withColumn("EventDate", to_date(retail_df.EventTime)) \
    .writeStream \
    .format('parquet') \
    .outputMode("append") \
    .partitionBy("InvoiceDate","Country") \
    .trigger(once=True) \
    .option('checkpointLocation', base_path + '/_checkpoint') \
    .start(base_path)

<pyspark.sql.streaming.StreamingQuery at 0x12b734100>

In [13]:
written_retail_df = spark.read.format("parquet") \
    .("InvoiceDate","Country") \
    .load(base_path)

AttributeError: 'DataFrameReader' object has no attribute 'partitionBy'

In [10]:
written_retail_df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|           EventTime| EventDate|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+--------------------+----------+
|   503782|    22331|WOODLAND PARTY BA...|      16|07/04/10 11:59|     1.65|     12711|       Germany|2023-03-08 02:13:...|2023-03-08|
|   489434|    85048|15CM CHRISTMAS GL...|      12|01/12/09 07:45|     6.95|     13085|United Kingdom|2023-03-08 02:13:...|2023-03-08|
|   523645|    21790|  VINTAGE SNAP CARDS|       5|23/09/10 12:19|     0.85|     12838|United Kingdom|2023-03-08 02:13:...|2023-03-08|
|   494014|    20714|    POSY SHOPPER BAG|      10|11/01/10 09:33|     0.87|      null|United Kingdom|2023-03-08 02:13:...|2023-03-08|
|   513647|    21975|PACK OF 60 DINOSA...|     120|28/0