In [56]:
import findspark
findspark.init('/opt/spark/spark-3.2.1-bin-hadoop3.2')

In [57]:
import warnings
warnings.filterwarnings('ignore')

In [58]:
## Import SparkSession
from pyspark.sql import SparkSession

## Create SparkSession 
spark = SparkSession.builder \
      .master("local") \
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/company.myCollection") \
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/company.myCollection") \
      .appName("Spark-Kafka-Spark-MongoDB-Streamlit") \
      .getOrCreate() 

In [59]:
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "data") \
    .option("startingOffsets", "earliest") \
    .load()
    
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [60]:
from pyspark.sql.types import StructField, IntegerType, StringType, StructType, DoubleType
from pyspark.sql.functions import from_json, col

schema = StructType([ \
    StructField("id",IntegerType(),False), \
    StructField("first_name",StringType(),True), \
    StructField("job",StringType(),True), \
    StructField("gender", StringType(), True), \
    StructField("salary", DoubleType(), True), \
    StructField("country", StringType(), True), \
    StructField("car_model_year", IntegerType(), True), \
    StructField("car_make", StringType(), True)
  ])

In [62]:
df = df.select(from_json(col('value').cast('string'),schema).alias('data')).select('data.*')

In [63]:
df.write.format("mongo").mode("overwrite").option("database","company").option("collection", "raw").save()

In [None]:
df_analysis = df.groupBy('country').pivot('gender').agg({'salary':'mean'})

In [54]:
df_analysis.write.format("mongo").mode("overwrite").option("database","company").option("collection", "country_gender_salary").save()

In [45]:
df_analysis= df.groupBy('country').pivot('gender').agg({'id':'count'})

In [48]:
df_analysis.write.format("mongo").mode("overwrite").option("database","company").option("collection", "country_gender").save()

In [35]:
df_analysis = df.groupBy('car_make').agg({'id':'count'}).orderBy('count(id)',ascending=False)

In [36]:
df_analysis.write.format("mongo").mode("overwrite").option("database","company").option("collection", "car_make_count").save()