In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split


spark = SparkSession \
    .builder \
    .appName("velib-stream") \
    .getOrCreate()

In [2]:
velib_json = (
    spark.read
    #.schema(velib_schema)
    .json("hdfs://pi1:9000/station_status.json")
)

In [3]:
import pyspark.sql
import pyspark
from pyspark.sql import functions as F
velib_json.printSchema()
velib_json.show(1,truncate=True)

root
 |-- data: struct (nullable = true)
 |    |-- stations: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- is_installed: long (nullable = true)
 |    |    |    |-- is_renting: long (nullable = true)
 |    |    |    |-- is_returning: long (nullable = true)
 |    |    |    |-- last_reported: long (nullable = true)
 |    |    |    |-- numBikesAvailable: long (nullable = true)
 |    |    |    |-- numDocksAvailable: long (nullable = true)
 |    |    |    |-- num_bikes_available: long (nullable = true)
 |    |    |    |-- num_bikes_available_types: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- ebike: long (nullable = true)
 |    |    |    |    |    |-- mechanical: long (nullable = true)
 |    |    |    |-- num_docks_available: long (nullable = true)
 |    |    |    |-- stationCode: string (nullable = true)
 |    |    |    |-- station_id: long (nullable = true)
 |-- lastUp

In [4]:
velib_format = (
    velib_json.select(
        F.explode(velib_json.data.stations).alias("data"),
    ) 
)

velib_format.show(2,truncate = False)
velib_format.printSchema()

velib_format= (
    velib_format  
    .select(
        velib_format.data.station_id.alias("station_id"),
        velib_format.data.num_docks_available.alias("num_docks_available"),
        velib_format.data.num_bikes_available.alias("num_bikes_available"),
        velib_format.data.num_bikes_available_types.ebike[1].alias("num_ebike_available"),
        velib_format.data.num_bikes_available_types.mechanical[0].alias("num_mechanical_available"),
        velib_format.data.stationCode.alias("stationCode"),
    )

)

velib_format.show(3,truncate = False)
velib_format.printSchema()

+---------------------------------------------------------------------+
|data                                                                 |
+---------------------------------------------------------------------+
|[1, 1, 1, 1608703580, 9, 25, 9, [[, 2], [7,]], 25, 16107, 213688169] |
|[1, 1, 1, 1608703854, 38, 16, 38, [[, 33], [5,]], 16, 6015, 99950133]|
+---------------------------------------------------------------------+
only showing top 2 rows

root
 |-- data: struct (nullable = true)
 |    |-- is_installed: long (nullable = true)
 |    |-- is_renting: long (nullable = true)
 |    |-- is_returning: long (nullable = true)
 |    |-- last_reported: long (nullable = true)
 |    |-- numBikesAvailable: long (nullable = true)
 |    |-- numDocksAvailable: long (nullable = true)
 |    |-- num_bikes_available: long (nullable = true)
 |    |-- num_bikes_available_types: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- ebike: long (nullable =

In [8]:
velib_format.write \
  .format("com.databricks.spark.csv") \
  .option("header", "false")\
  .save("hdfs://pi1:9000/velib-data")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark_kafka = (
    SparkSession.builder 
    .appName("spark_kafka") 
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") 
    .getOrCreate()
    )


In [33]:
from pyspark.sql.types import *

velib_schema = StructType([
  StructField("data", StructType([
      StructField("stations",ArrayType(
          StructType([
              StructField("is_instaled",BooleanType()),
              StructField("is_renting",BooleanType()),
              StructField("is_returning",BooleanType()),
              StructField("last_reported",IntegerType()),
              StructField("numBikesAvailable",IntegerType()),
              StructField("numDocksAvailable",IntegerType()),
              StructField("numBikesAvailable",IntegerType()),
              StructField("num_bikes_available",IntegerType()),
              StructField("num_bikes_available_types",ArrayType(
                  StructType([
                      StructField("ebike",IntegerType()),
                      StructField("mechanical",IntegerType()),
                  ])
              )),
              StructField("num_docks_available",IntegerType()),
              StructField("station_id",IntegerType())
          ])
      ))
  ])),
  StructField("lastUpdatedOther",IntegerType()),
  StructField("ttl",IntegerType())
])

In [24]:
from pyspark.sql import functions as F
import json
df =( spark_kafka.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "pi1:9092")
        .option("subscribe", "velib-data")
        .option("startingOffsets", "earliest")
        .load()
    )

df.printSchema()
velib_json = df.selectExpr("CAST(value AS STRING)")

velib_format = velib_json.select(F.from_json(F.col("value"),velib_schema).alias("data")).select("data.*")

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

