In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

jar_files = [
    "/util/clickhouse-jdbc-0.3.2-all.jar"
]

# Initialize Spark session with JARs
spark = SparkSession.builder \
    .appName("test-jdbc") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", ",".join(jar_files)) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key","minio") \
    .config("spark.hadoop.fs.s3a.secret.key","minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


url = "jdbc:ch://clickhouse:8123/cameraaberta"
user = "admin" 
password = "admin"  
driver = "com.clickhouse.jdbc.ClickHouseDriver"

display(spark)

In [None]:
#stops

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/stops.txt")
# show data
df.show(10)

df.printSchema()

df.createOrReplaceTempView('stops')

spark.sql('select * from stops limit 5').show()

In [None]:
#trips

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/trips.txt")
# show data
df.show(10)

df.printSchema()



In [None]:
#stop-times

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/stop_times.txt")
# show data
df.show(10)

df.printSchema()


In [None]:
#shapes

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/shapes.txt")
# show data
df.show(10)

df.printSchema()


In [None]:
#routes

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/routes.txt")
# show data
df.show(10)

df.printSchema()



In [None]:
#frequencies

df = spark.read.option("inferSchema", True).option("header", "true") \
    .csv(f"s3a://raw/frequencies.txt")
# show data
df.show(10)

df.printSchema()


In [None]:

df = spark.read.option("inferSchema", True) \
    .json(f"s3a://raw/linhas_from_api.json")
df.show(10)

df.printSchema()


In [None]:
#posicoes

df = spark.read.option("inferSchema", True) \
    .json(f"s3a://raw/posicoes.csv")
# show data
# df.show(10)

# df.printSchema()

from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col


def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

df_flatten = flatten(df)
df_flatten.show()

# Write DataFrame to ClickHouse
df_flatten.write \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "posicoes") \
    .option("createTableOptions", "ENGINE=MergeTree() ORDER BY (hr,l_vs_p)") \
    .mode("overwrite") \
    .save()

In [None]:
#previsao

df = spark.read.option("inferSchema", True) \
    .json(f"s3a://raw/previsao.csv")
# show data
# df.show(10)

# df.printSchema()

from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col


def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

df_flatten = flatten(df)
df_flatten.show()

# Write DataFrame to ClickHouse
df_flatten.write \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "previsao") \
    .option("createTableOptions", "ENGINE=MergeTree() ORDER BY (hr,l_vs_p)") \
    .mode("overwrite") \
    .save()


In [None]:
#linhas

df = spark.read.option("inferSchema", True) \
    .json(f"s3a://raw/linhas.csv")
# show data
# df.show(10)

# df.printSchema()

from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col


def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

df_flatten = flatten(df)
df_flatten.show()

# Write DataFrame to ClickHouse
df_flatten.write \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "linhas") \
    .option("createTableOptions", "ENGINE=MergeTree() ORDER BY (cl)") \
    .mode("overwrite") \
    .save()
