In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("bulk_stream_spark")\
    .config("spark.logConf", "true")\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0')\
    .getOrCreate()

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
spark.conf.set('spark.sql.caseSensitive', True)

In [None]:
path = 'data.json'
checkpoint = '/home/marcelo/BULK/desafio_streamSpark/checkpoint'

In [None]:
schema2 = StructType([StructField('current_observation', \
                                  StructType([StructField('@version', StringType(), True),\
                                              StructField('@xmlns:xsd', StringType(), True),\
                                              StructField('@xmlns:xsi', StringType(), True),\
                                              StructField('@xsi:noNamespaceSchemaLocation', StringType(), True),\
                                              StructField('copyright_url', StringType(), True),\
                                              StructField('credit', StringType(), True),\
                                              StructField('credit_URL', StringType(), True),\
                                              StructField('dewpoint_c', StringType(), True),\
                                              StructField('dewpoint_f', StringType(), True),\
                                              StructField('dewpoint_string', StringType(), True),\
                                              StructField('disclaimer_url', StringType(), True),\
                                              StructField('icon_url_base', StringType(), True),\
                                              StructField('icon_url_name', StringType(), True),\
                                              StructField('image', StructType([\
                                                                               StructField('link', StringType(), True),\
                                                                               StructField('title', StringType(), True),\
                                                                               StructField('url', StringType(), True)]),True),\
                                              StructField('latitude', StringType(), True),\
                                              StructField('location', StringType(), True),\
                                              StructField('longitude', StringType(), True),\
                                              StructField('ob_url', StringType(), True),\
                                              StructField('observation_time', StringType(), True),\
                                              StructField('observation_time_rfc822', StringType(), True),\
                                              StructField('pressure_in', StringType(), True),\
                                              StructField('pressure_mb', StringType(), True),\
                                              StructField('pressure_string', StringType(), True),\
                                              StructField('privacy_policy_url', StringType(), True),\
                                              StructField('relative_humidity', StringType(), True),\
                                              StructField('station_id', StringType(), True),\
                                              StructField('suggested_pickup', StringType(), True),\
                                              StructField('suggested_pickup_period', StringType(), True),\
                                              StructField('temp_c', StringType(), True),\
                                              StructField('temp_f', StringType(), True),\
                                              StructField('temperature_string', StringType(), True),\
                                              StructField('two_day_history_url', StringType(), True),\
                                              StructField('visibility_mi', StringType(), True),\
                                              StructField('weather', StringType(), True),\
                                              StructField('wind_degrees', StringType(), True),\
                                              StructField('wind_dir', StringType(), True),\
                                              StructField('wind_kt', StringType(), True),\
                                              StructField('wind_mph', StringType(), True),\
                                              StructField('wind_string', StringType(), True),\
                                              StructField('windchill_c', StringType(), True),\
                                              StructField('windchill_f', StringType(), True),\
                                              StructField('windchill_string', StringType(), True)]), True)])

In [None]:
df_json = spark.readStream.format("json").option("maxFilesPerTrigger", 2).option("mode", "PERMISSIVE").schema(schema2)\
.load('/home/marcelo/BULK/desafio_streamSpark/files/*json')

In [None]:
select = df_json.selectExpr("CAST (current_observation.location AS STRING) AS Localizacao",
                        "CAST (current_observation.observation_time_rfc822 AS STRING) AS Data_Hora",
                        "CAST (current_observation.temp_c AS DOUBLE) AS Temperatura",
                        "CAST (current_observation.pressure_in AS DOUBLE) AS Pressao",
                        "CAST (current_observation.weather AS STRING) AS Clima")

In [None]:
select.writeStream\
    .format('mongodb')\
    .option('spark.mongodb.connection.uri', 'mongodb://127.0.0.1:27017/')\
    .option('spark.mongodb.database', 'marcelo')\
    .option('spark.mongodb.collection', 'weather')\
    .option("checkpointLocation", checkpoint)\
    .outputMode("update").start().awaitTermination()