In [1]:
!pip install schedule



In [2]:
import json
import time
import pyspark
import schedule
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import StructType, StructField, MapType, StringType

In [3]:
def init_spark_session():    
    try:
        spark = SparkSession.builder \
            .appName("DeltaLakeExample") \
            .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()

        spark.conf.set("spark.hadoop.fs.s3a.access.key", "datalake")
        spark.conf.set("spark.hadoop.fs.s3a.secret.key", "datalake")
        spark.conf.set("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        spark.conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
        spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        print("Sessão Spark criada com sucesso!")

        return spark
    except Exception as err:
        print("Something went wrong trying to create or get spark session!")
        print(f"ERROR: {err}")
        return None

In [4]:
def auth_session(auth_url, apikey_param):
    try:
        session = requests.session()
        session.post(auth_url, params=apikey_param)
        return session
    except Exception as err:
        print("Something went wrong trying to authenticate session")
        print(f"ERROR: {err}")
        return None

In [5]:
def treat_raw_data(df):
    df_exploded = df.withColumn("l", explode(df.l))
    df_semi = df_exploded.withColumn("vs", explode(df_exploded.l.vs))
    
    df_treated = df_semi.withColumnRenamed("hr", "hora") \
                  .withColumn("letreiro_completo", col("l.c")) \
                  .withColumn("cd_linha", col("l.cl")) \
                  .withColumn("sentido", col("l.sl")) \
                  .withColumn("letreiro_destino", col("l.lt0")) \
                  .withColumn("letreiro_origem", col("l.lt1")) \
                  .withColumn("quantidade_veiculos", col("l.qv")) \
                  .withColumn("prefixo", col("vs.p")) \
                  .withColumn("flag_acessivel", col("vs.a")) \
                  .withColumn("utc", col("vs.ta")) \
                  .withColumn("latitude_veiculo", col("vs.py")) \
                  .withColumn("longitude_veiculo", col("vs.px"))

    return df_treated.drop("l","vs")   

In [6]:
def job():
    spark = init_spark_session()
    url = 'http://api.olhovivo.sptrans.com.br/v2.1/Login/Autenticar'
    apikey = {'token': "4878e23ec08fc0d75b1f6c39301e58c8c3a741b5f21dd1221ebacf87225e1cf5"} 
    session = auth_session(url, apikey)

    response = session.get('http://api.olhovivo.sptrans.com.br/v2.1/Posicao')
    json_rdd = spark.sparkContext.parallelize([response.text])
    df = spark.read.json(json_rdd)
    df = treat_raw_data(df)

    df.write.format("delta").mode("append").save("s3a://raw/position")

In [7]:
# schedule.every(1).minutes.do(job)

# while True:
#     schedule.run_pending()
#     time.sleep(1)

In [8]:
job()

Sessão Spark criada com sucesso!


In [9]:
spark = init_spark_session()
df = spark.read.format("delta").load("s3a://raw/position")
print("dados escritos na camada raw!")

Sessão Spark criada com sucesso!
dados escritos na camada raw!
