In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


import time

kafka_topic_name = "songTopic"
kafka_bootstrap_servers = 'localhost:9092'



In [2]:
spark = SparkSession \
        .builder \
        .appName("Spotify Streaming Reccomendation System") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

    

In [3]:
# Construct a streaming DataFrame that reads from test-topic
songs_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

print("Printing Schema of songs_df: ")

songs_df.printSchema()

    

Printing Schema of songs_df: 
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
songs_df1 = songs_df.selectExpr("CAST(value AS STRING)", "timestamp")


songs_schema_string = "id STRING, name STRING,popularity INT, duration_ms DOUBLE, explicit INT, " \
                           + "artists STRING, id_artists STRING, release_date STRING, " \
                           + "danceability DOUBLE," \
                           + "energy DOUBLE, key INT, loudness DOUBLE, " \
                           + "mode INT," \
                           + "speechiness DOUBLE," \
                           + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
                           + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"



songs_df2 = songs_df1 \
        .select(from_csv(col("value"), songs_schema_string) \
                .alias("song"), "timestamp")


songs_df3 = songs_df2.select("song.*", "timestamp")
songs_df3.printSchema()



root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [5]:
songs_df3.createOrReplaceTempView("song_find");
song_find_text = spark.sql("SELECT name, artists FROM song_find")
songs_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testeTable") \
        .start()

songs_agg_write_stream.awaitTermination(5)

print("Songs Streaming...")

Songs Streaming...


In [8]:
sqlDF = spark.sql("SELECT * FROM testeTable")
sqlDF.show()

+--------------------+--------------+
|                name|       artists|
+--------------------+--------------+
|  Old Fashioned Girl|    GregFieler|
|Martín Fierro - R...|IgnacioCorsini|
|Capítulo 2.8 - Ba...|FernandoPessoa|
|Capítulo 2.25 - B...|FernandoPessoa|
|            Lazy Boi|           Uli|
|Tu Verras Montmartre|   LucienBoyer|
|Elle Prend L'boul...|     FlixMayol|
+--------------------+--------------+



In [None]:
songs_df3.createOrReplaceTempView("song_find");
song_find_text = spark.sql("SELECT name, artists FROM song_find")
songs_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .option("truncate", "false") \
        .format("console") \
        .start()

songs_agg_write_stream.awaitTermination()

print("Songs Streaming...")

In [None]:
songs_agg_write_stream.stop()

In [None]:
def csv_output():    
    song_find_text = spark.sql("SELECT name, artists FROM song_find")
    songs_agg_write_stream = song_find_text \
            .writeStream \
            .trigger(processingTime='5 seconds') \
            .outputMode("append") \
            .option("truncate", "false") \
            .option("path",'answ') \
            .option("checkpointLocation", "checkpoint_path") \
            .format("csv") \
            .start()

    songs_agg_write_stream.awaitTermination()

    print("Songs Streaming...")