In [None]:
from pyspark.sql import SparkSession,Row,DataFrame
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
import pyspark.sql.functions as F
import traceback

builder = SparkSession \
    .builder \
    .appName("Data with Nikk the Greek Spark Session") \
    .master("local[4]") \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.warehouse.dir", "/Users/eduardoalberto/LoadFile/repository/deltaTable/") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

sc = spark.sparkContext
spark.sparkContext.setLogLevel("OFF") 
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)

spark


In [None]:
path_delta = "/Users/eduardoalberto/LoadFile/dataDelta/movie"
df = spark.read.parquet("/Users/eduardoalberto/LoadFile/parquet/movie")
df.write.format("delta").mode("overwrite").save(path_delta)

### Dataframe

In [None]:
spark.read.format("delta").load(path_delta).show(truncate=False)

In [None]:
dfs = spark.read.option("delimiter",';')\
                .option("header", "True")\
                .option("inferSchema", "True")\
                .csv("/Users/eduardoalberto/LoadFile/part-00000-055103f0-b275-4e27-b667-0c2c25d0636a-c000.csv")
# dfs.printSchema()
# dfs.toPandas()

In [None]:
df01 = dfs.groupBy("name","imdb_id","overview","revenue","runtime","status","title","vote_average","vote_count","popularity","name_geners")\
          .agg(F.count("title").alias("total"))\
          .withColumn("dt_ref_carga", F.current_date())

df02 = dfs.groupBy("name","imdb_id","overview","revenue","runtime","status","title","vote_average","vote_count","popularity","name_geners")\
          .agg(F.count("title").alias("total"))\
          .withColumn("dt_ref_carga", F.current_date()+2)

df02.toPandas()

In [None]:
arqDelta = "/Users/eduardoalberto/LoadFile/dataDelta/ratingMovie"
# df = spark.read.parquet("/Users/eduardoalberto/LoadFile/parquet/movie")
df01.write.format("delta").mode("overwrite").partitionBy("dt_ref_carga").option("overwriteSchema", "true").saveAsTable("tb_ratingMovie")
df02.write.format("delta").mode("overwrite").partitionBy("dt_ref_carga").option("overwriteSchema", "true").saveAsTable("st_ratingMovie")


In [None]:
spark.sql("desc history tb_ratingMovie").show()

In [None]:
spark.table("st_ratingMovie").filter("imdb_id = 'tt0036818'").toPandas()

spark.table("tb_ratingmovie").toPandas()

# spark.read.format("delta").load(arqDelta).toPandas()

In [None]:
spark.sql(""" update st_ratingMovie
              set imdb_id = "tt0036818",
                  revenue = 1,
                  vote_average = 7,
                  vote_count = 42,
                  dt_ref_carga = current_date()+1
               where imdb_id = 'tt0036818'    
""")

spark.table("st_ratingMovie").filter("imdb_id = 'tt0036818' ").toPandas()

### MERGE UPDATE SPARK SQL

In [None]:
spark.sql("""merge into tb_ratingMovie T1
             using st_ratingMovie T2
             on T1.imdb_id = T2.imdb_id
             when matched then update set *
             when not matched by target then insert *
""")

In [None]:
# spark.table("tb_ratingMovie").select("dt_ref_carga").show()

spark.table("tb_ratingMovie").filter("imdb_id = 'tt0036818' ").toPandas()

### MERGE UPDATE PYSPARK

In [None]:
dftg = DeltaTable.forPath(spark, "/Users/eduardoalberto/LoadFile/repository/deltaTable/tb_ratingmovie")
dfsr = spark.read.format("delta").load("/Users/eduardoalberto/LoadFile/repository/deltaTable/st_ratingmovie")


(dftg.alias("dftg")
     .merge(dfsr.alias("dfsr"),"dftg.imdb_id = dfsr.imdb_id")
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .whenNotMatchedBySourceDelete()
     .execute()
 )



In [None]:
spark.table("tb_ratingmovie").filter("imdb_id = 'tt0036818'").toPandas()

In [None]:



import os.path as path


def LaodDelta (dir):

    try:
        path.exists(dir)
        df = spark.read.option("delimiter",';').option("header", "True").option("inferSchema", "True").csv(dir)
        df01 = df.groupBy("name","imdb_id","overview","revenue","runtime","status","title","vote_average","vote_count","popularity","name_geners")\
                 .agg(F.count("title").alias("total"))\
                 .withColumn("dt_ref_carga", F.current_date())

        df02 = df.groupBy("name","imdb_id","overview","revenue","runtime","status","title","vote_average","vote_count","popularity","name_geners")\
                 .agg(F.count("title").alias("total"))\
                 .withColumn("dt_ref_carga", F.current_date()+2)
    
        df01.write.format("delta").mode("overwrite").partitionBy("dt_ref_carga").option("overwriteSchema", "true").saveAsTable("tb_ratingMovie")
        df02.write.format("delta").mode("overwrite").partitionBy("dt_ref_carga").option("overwriteSchema", "true").saveAsTable("st_ratingMovie")
 
    except Exception as e:
        print(f"Ocorreu o seguinte erro: {e}")
        traceback.print_exc()
    return None

def MergeDelta(dftg,dfsr):
    try:
        dftg.alias("dftg")\
            .merge(dfsr.alias("dfsr"),"dftg.imdb_id = dfsr.imdb_id")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .whenNotMatchedBySourceDelete()\
            .execute()
        
    except Exception as e:
        print(f"Ocorreu o seguinte erro: {e}")
        traceback.print_exc()
    return None
    


#################################################################################################################################################################

dftg = DeltaTable.forPath(spark, "/Users/eduardoalberto/LoadFile/repository/deltaTable/tb_ratingmovie")
dfsr = spark.read.format("delta").load("/Users/eduardoalberto/LoadFile/repository/deltaTable/st_ratingmovie")


dir = "/Users/eduardoalberto/LoadFile/part-00000-055103f0-b275-4e27-b667-0c2c25d0636a-c000.csv"

LaodDelta(dir)
MergeDelta(dftg,dfsr)
