## CRIANDO CONEXÃO

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Aula").enableHiveSupport().getOrCreate()

In [4]:
spark

## CRIANDO DATAFRAME MANUALMENTE

In [5]:
df_aluno = spark.createDataFrame([
    {"id": 1, "name": "Fabio"},
    {"id": 2, "name": "Joao"},
    {"id": 3, "name": "Fernando"}
])

In [6]:
#tipo do objeto
type(df_aluno)

pyspark.sql.dataframe.DataFrame

## HELP

In [None]:
help(df_aluno)

In [7]:
df_aluno.write.json('s3a://raw/aluno')

## CRIANDO DF A PARTIR DE ARQUIVOS

In [None]:
#CSV HADOOP

In [None]:
df_csv = spark.read.csv('/datalake/raw/pessoas',header=True)

In [None]:
#JSON HADOOP

In [None]:
df_json = spark.read.json('/datalake/raw/user')

In [None]:
#JSON S3

In [None]:
df_jsons3 = spark.read.json('s3a://camada-bronze/user/')

In [None]:
df_teste = spark.read.load("/datalake/raw/pessoas",format="csv", sep=",", inferSchema="true", header="true")

In [None]:
#CSV LOCAL
df_local = spark.read.csv('file:///home/user/dados.txt',header=True)

## CRIANDO DF A PARTIR DE BANCO DE DADOS

In [None]:
url = "jdbc:postgresql://postgres:5432/dvdrental"

properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

In [None]:
df_city = spark.read.jdbc(url=url,table='public.city',properties=properties)
df_county = spark.read.jdbc(url=url,table='public.country',properties=properties)

In [None]:
query = '(select c.city_id ,city, country from public.city c \
        inner join public.country c2 \
        on c2.country_id  = c.country_id) as tab '

In [None]:
df_query = spark.read.jdbc(url=url,table=query,properties=properties)

## AÇÃO E TRANSFORMAÇÃO

In [None]:
#AÇÃO
df_city.show(10)
df_city.count()

In [None]:
#TRANSFORMAÇÃO
df_city2 = df_city.filter(df_city.country_id == 101).filter(df_city.city.startswith('A'))
df_city2.explain(extended=True)

## VALIDANDO DATAFRAME

In [None]:
#print

In [None]:
df_city.show(10,truncate=False)

In [None]:
#describe

In [None]:
df_city.describe().show()

In [None]:
#printSchema

In [None]:
df_city.printSchema()

In [None]:
#count()

In [None]:
df_city.count()

In [None]:
#sample([withReplacement, fraction, seed])  Returns a sampled subset of this DataFrame.
df_city.sample(0.1).show()

## MANIPULANDO DF

In [None]:
# isEmpty() Returns True if this DataFrame is empty.
df_city.isEmpty()

In [None]:
# select(*cols) Projects a set of expressions and returns a new DataFrame.
df_city.select('city').show()

In [None]:
#orderBy(*cols, **kwargs) Returns a new DataFrame sorted by the specified column(s).
#sort(*cols, **kwargs) Returns a new DataFrame sorted by the specified column(s).

df_city.orderBy('city',ascending=True).show(5)

In [None]:
from pyspark.sql.functions import desc, asc
df_city.orderBy(df_city.city.asc()).show(5)
df_city.orderBy(df_city.city.desc()).show(5)

In [None]:
# where e filter
df_city.where(df_city.city_id == 1).show(truncate=False)

In [None]:
df_city.filter(df_city.city_id == 1).show(truncate=False)

In [None]:
#fillna(value[, subset]) Replace null values, alias for na.fill().
df_city.na.fill('').show()

In [None]:
# distinct() Returns a new DataFrame containing the distinct rows in this DataFrame.
df_city.select('city').distinct().show()

In [None]:
#first() Returns the first row as a Row.
df_city.first()


In [None]:
#head([n]) Returns the first n rows.
df_city.head()


In [None]:
#limit(num) Limits the result count to the number specified.
df_city.limit(5).show()

In [None]:
#tail(num) Returns the last num rows as a list of Row.
df_city.tail(5)


In [None]:
#take(num) Returns the first num rows as a list of Row.
df_city.take(5)

In [None]:
# collect() Returns all the records as a list of Row.
df_city.collect()

In [None]:
#groupBy(*cols) Groups the DataFrame using the specified columns, so we can run aggregation on them.
df_city.groupby('country_id').count().show()

In [None]:
# agg(*exprs) Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg()).
df_city \
    .groupby('country_id') \
    .agg({"country_id": "sum"}) \
    .show(5)

In [None]:
# join inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
df_city.join(df_county,df_city.country_id == df_county.country_id,'inner').show()

In [None]:
#union
df_city.select('country_id').union(df_county.select('country_id')).show(5)

In [None]:
#toDF(*cols) Returns a new DataFrame that with new specified column names
#toJSON([use_unicode])
#toPandas()
df_city.toJSON()
p = df_city.toPandas()
p




In [None]:
#partitions
df_city.rdd.getNumPartitions()
df_city = df_tab.repartition(2)
df_city.rdd.getNumPartitions()
#df.rdd.partitions.length()
#df.rdd.partitions.size()

In [None]:
# drop(*cols) Returns a new DataFrame without specified columns.
df_city.drop('last_update').show()

In [None]:
# dropDuplicates([subset]) Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
df_city.select('city').count()
df_city.select('city').dropDuplicates().count()

In [None]:
# dropna([how, thresh, subset]) Returns a new DataFrame omitting rows with null values.
df_city.na.drop().show()

In [None]:
#withColumn(colName, col) Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
from pyspark.sql.functions import monotonically_increasing_id
df_city.withColumn('new_id',monotonically_increasing_id()).show(5)

In [None]:
#withColumnRenamed(existing, new) Returns a new DataFrame by renaming an existing column.
df_city.withColumnRenamed('last_update','updated').show()

In [None]:
#alterar typo de coluna
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

df_aluno.withColumn("id",col("id").cast(IntegerType()))
df_aluno.withColumn("id",col("id").cast("int"))
df_aluno.withColumn("id",col("id").cast("integer"))

In [None]:
#foreach(f) Applies the f function to all Row of this DataFrame.
#foreachPartition(f) Applies the f function to each partition of this DataFrame.
def func(df):
    print(df.city)
df_city.foreach(func)

## SQL

In [None]:
#createOrReplaceGlobalTempView(name) 
#createOrReplaceTempView(name)
#createTempView(name)
df_city.createOrReplaceTempView('city')

In [None]:
spark.sql('select * from city limit 5').show()

## PLANO DE EXECUÇÃO

In [None]:
#explain([extended, mode])Prints the (logical and physical) plans to the console for debugging purposes.
df_city.explain() 

## TUNNING 

In [None]:
#HABILITADO DESDE O SPARK 3.2
spark.conf.set("spark.sql.adaptive.enabled", "true")

In [None]:
# cache unionAll unpersist
df_city.cache()
df_city.persist()
df_city.unpersist()

In [None]:
# HINT
spark.sql('SELECT /*+ REPARTITION(5) */ * FROM city;').explain()

In [None]:
# BROADCAST VARIABLE
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

## SALVANDO DADOS

In [None]:
#CSV HADOOP

In [None]:
df_city.write.csv('/datalake/process/city',header=True,sep=',',mode='overwrite')

In [None]:
#CSV S3

In [None]:
df_city.write.csv('s3a://camada-prata/city/',mode='overwrite')

In [None]:
#JSON

In [None]:
df_city.write.json('s3a://camada-prata/city_json/',mode='overwrite')

In [None]:
#PARQUET

In [None]:
df_city.write.parquet('s3a://camada-prata/city_parquet/',mode='overwrite')

In [None]:
spark.sql("SELECT * FROM parquet.`s3a://camada-prata/city_parquet/`").show()

In [None]:
#ORC

In [None]:
df_city.write.orc('s3a://camada-prata/city_orc/',mode='overwrite')

In [None]:
#DATABASE
df_city.write \
    .jdbc(url=url, table='public.df',properties=properties,mode='overwrite')

## HIVE

In [None]:
df_raw = spark.read.json('s3a://camada-bronze/user/')

In [None]:
df_final = \
df_raw.select( \
     'address.city' \
    ,'address.coordinates.lat' \
    ,'address.coordinates.lng' \
    ,'address.country' \
    ,'address.state' \
    ,'address.street_address' \
    ,'address.street_name' \
    ,'address.zip_code' \
    ,'avatar' \
    ,'credit_card.cc_number' \
    ,'date_of_birth' \
    ,'email' \
    ,'employment.key_skill' \
    ,'employment.title' \
    ,'first_name' \
    ,'gender' \
    ,'id' \
    ,'last_name' \
    ,'password' \
    ,'phone_number' \
    ,'social_insurance_number' \
    ,'subscription.payment_method' \
    ,'subscription.plan' \
    ,'subscription.status' \
    ,'subscription.term' \
    ,'uid' \
    ,'username' \
             )

In [None]:
df_final.count()

In [None]:
df_final.write.format('hive').saveAsTable('default.teste3')

In [None]:
spark.sql("show tables from default").show()

In [None]:
#PARTICIONADO
df.write.partitionBy("country_id").format("parquet").save("/process/tabpart")

## DELTA LAKE

In [1]:
import requests
import json
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("delta").master("spark://spark-master:7077") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key","datalake") \
    .config("spark.hadoop.fs.s3a.secret.key","datalake") \
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()

In [2]:
def loadData (qtde):
    list = []
    for x in range (qtde):
        print(x)
        r = requests.get('https://random-data-api.com/api/v2/users')
        list.append(r.json())
        req = spark.read.json(spark.sparkContext.parallelize(list))
        req = req.select( \
         'email' \
        ,'first_name' \
        ,'last_name' \
        ,'gender' \
        ,'id' \
        ,'username' \
                 )
    return req

In [5]:
df = loadData(2)

0
1


In [6]:
df.show(truncate=False)

+-----------------------+----------+---------+----------+----+-------------+
|email                  |first_name|last_name|gender    |id  |username     |
+-----------------------+----------+---------+----------+----+-------------+
|buford.cole@email.com  |Buford    |Cole     |Polygender|1860|buford.cole  |
|hal.kertzmann@email.com|Hal       |Kertzmann|Bigender  |4111|hal.kertzmann|
+-----------------------+----------+---------+----------+----+-------------+



In [7]:
path = 's3a://raw/presto/sales_data_new'

In [8]:
df = df.select('id')

In [9]:
df.show()

+----+
|  id|
+----+
|1860|
|4111|
+----+



In [10]:
#salvando como delta table
df.write.format("delta").mode('overwrite').save(path)

In [None]:
deltaTable = DeltaTable.forPath(spark, path)

In [None]:
type(deltaTable)

In [None]:
d = spark.read.format("delta").load(path)

In [None]:
type(d)

In [None]:
deltaTable.toDF().show(truncate=False)

In [None]:
### MERGE
#NOVOS DADOS
raw = loadData(10)

In [None]:
raw.show(truncate=False)

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable.alias("process") \
  .merge(
    raw.alias("raw"),
    "process.id = raw.id") \
  .whenNotMatchedInsert(values = {'email' : col('email') \
                                 ,'first_name' : col('first_name') \
                                 ,'last_name' : col('last_name') \
                                 ,'gender' : col('gender') \
                                 ,'id' : col('id') \
                                 ,'username' : col('username') \
                                 }) \
  .whenMatchedUpdate(set = {'email' : col('raw.email') \
                                 ,'first_name' : col('raw.first_name') \
                                 ,'last_name' : col('raw.last_name') \
                                 ,'gender' : col('raw.gender') \
                                 ,'username' : col('raw.username') \
                                 }) \
  .execute()

In [None]:
deltaTable.toDF().show(truncate=False)

## SPARK SUBMIT

In [None]:
from pyspark.sql import SparkSession
import requests
spark = SparkSession.builder.appName("Aula").enableHiveSupport().getOrCreate()
def loadData (qtde):
    list = []
    for x in range (qtde):
        print(x)
        r = requests.get('https://random-data-api.com/api/v2/users')
        list.append(r.json())
        req = spark.read.json(spark.sparkContext.parallelize(list))
        req = req.select( \
         'email' \
        ,'first_name' \
        ,'last_name' \
        ,'gender' \
        ,'id' \
        ,'username' \
                 )
    return req

df = loadData(10)
df.repartition(1).write.parquet('/datalake/raw/api',mode='append')

In [None]:
#spark-submit --master spark://spark-master:7077 --supervise --executor-memory 1G --total-executor-cores 1 programa.py

## STREAMING

In [None]:
from pyspark.sql.types import  StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
from pyspark.sql.functions import expr, from_json, col, concat
from pyspark.sql import Window
from pyspark.sql import SparkSession
import requests
spark = SparkSession.builder.appName("Aula").enableHiveSupport().getOrCreate()

In [None]:
#API
#https://www.boredapi.com/api/activity/

In [None]:
schema = StructType([
    StructField("activity", StringType()),
    StructField("type", StringType()),
    StructField("participants", IntegerType()),
    StructField("price",DoubleType()),
    StructField("link", StringType()),
    StructField("key", StringType()),
    StructField("accessibility",DoubleType())])
schema

In [None]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker:9092") # kafka server
  .option("subscribe", "atividade") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  #.option("checkpoint","s3a://tmp/checkpoint")    
  .option("kafka.group.id", "spark3")
  .load() 
  )

In [None]:
df.isStreaming

In [None]:
# Transform to Output DataFrame
value_df = df.select(from_json(col("value").cast("string"),schema).alias("value"))

In [None]:
exploded_df = value_df.selectExpr('value.activity', 'value.type', 'value.participants', 'value.price',
                                      'value.price','value.link', 'value.key', 'value.accessibility')

In [None]:
exploded_df.printSchema()

In [None]:
#ESCREVER EM MEMÓRIA
rawQuery = exploded_df \
        .writeStream \
        .queryName("qraw")\
        .format("memory")\
        .start()

In [None]:
print(type(df))

In [None]:
rawQuery.status

In [None]:
raw = spark.sql("select * from qraw")


In [None]:
raw.show(truncate=False)

In [None]:
raw.count()

In [None]:
raw.select('*').show()

In [None]:
#ESCREVER EM DISCO
output_query = exploded_df.writeStream\
        .format("json")\
        .option("path","/datalake/raw/streaming")\
        .option("checkpointLocation", "chck-pnt-dir-kh")\
        .outputMode("append")\
        .queryName("SS Writter")\
        .start()

In [None]:
output_query.awaitTermination()