## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [None]:
#Definições padrões
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","


# Dados do Canadá (CA)
file_location_ca = "/FileStore/tables/CA_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_ca_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_ca)

df_ca_json = spark.read.option("multiline","true").json("/FileStore/CA_category_id.json")


# Dados do Brasil (BR)
file_location_br = "/FileStore/tables/BR_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_br_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_br)

df_br_json = spark.read.option("multiline","true").json("/FileStore/BR_category_id.json")

 # Dados da Alemanha (DE)
file_location_de = "/FileStore/DE_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_de_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_de)

df_de_json = spark.read.option("multiline","true").json("/FileStore/DE_category_id.json")

  # Dados da França (FR)
file_location_fr = "/FileStore/FR_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_fr_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_fr)

df_fr_json = spark.read.option("multiline","true").json("/FileStore/FR_category_id.json")


  # Dados da Gra-Bretanha (GB)
file_location_gb = "/FileStore/GB_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_gb_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_gb)

df_gb_json = spark.read.option("multiline","true").json("/FileStore/GB_category_id.json")

  # Dados da India (IN)
file_location_in = "/FileStore/IN_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_in_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_in)

df_in_json = spark.read.option("multiline","true").json("/FileStore/IN_category_id.json")


  # Dados do Japão (JP)
file_location_jp = "/FileStore/JP_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_jp_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_jp)

df_jp_json = spark.read.option("multiline","true").json("/FileStore/JP_category_id.json")


  # Dados da Coreia (KR)
file_location_kr = "/FileStore/KR_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_kr_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_kr)

df_kr_json = spark.read.option("multiline","true").json("/FileStore/KR_category_id.json")


# Dados do México (MX)
file_location_mx  = "/FileStore/MX_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_mx_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_mx)

df_mx_json = spark.read.option("multiline","true").json("/FileStore/MX_category_id.json")


# Dados da Russia (RU)
file_location_ru  = "/FileStore/RU_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_ru_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_ru)

df_ru_json = spark.read.option("multiline","true").json("/FileStore/RU_category_id.json")


  # Dados dos EUA (US)
file_location_us  = "/FileStore/US_youtube_trending_data.csv"
# The applied options are for CSV files. For other file types, these will be ignored.
df_us_csv = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiline",True)\
  .load(file_location_us)

df_us_json = spark.read.option("multiline","true").json("/FileStore/US_category_id.json")



In [None]:
#Objetivo: Ter uma tabela com todas as categorias que tem nos arquivos JSON
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col,desc

# Unindo todas as tabelas
df_json = df_ca_json.union(df_br_json).union(df_de_json).union(df_fr_json).union(df_gb_json).union(df_in_json).union(df_jp_json).union(df_kr_json).union(df_mx_json).union(df_ru_json).union(df_us_json)

display(df_json.count())

#Transformação dos dados
df_json = df_json.select(explode(col("items")).alias("element"))
df_json = df_json.select("element.id", "element.snippet.title")
df_json = df_json.withColumnRenamed("title", "category")

# Elimando duplicadas
df_json = df_json.distinct()



display(df_json.count())
df_json.orderBy(("id")).show()


1132+---+--------------------+
| id|            category|
+---+--------------------+
|  1|    Film & Animation|
| 10|               Music|
| 15|      Pets & Animals|
| 17|              Sports|
| 18|        Short Movies|
| 19|     Travel & Events|
|  2|    Autos & Vehicles|
| 20|              Gaming|
| 21|       Videoblogging|
| 22|      People & Blogs|
| 23|              Comedy|
| 24|       Entertainment|
| 25|     News & Politics|
| 26|       Howto & Style|
| 27|           Education|
| 28|Science & Technology|
| 29|Nonprofits & Acti...|
| 30|              Movies|
| 31|     Anime/Animation|
| 32|    Action/Adventure|
+---+--------------------+
only showing top 20 rows



In [None]:
#Objetivo: Unir todas as tabelas do tipo csv, reduzir o periodo de data, agregações e adições/remoções de coluuna
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, year, when, desc, lit
from pyspark.sql.window import Window

#Adição de coluna de linguagem para cada tabela
df_br_csv = df_br_csv.withColumn("language", lit("BR"))
df_ca_csv = df_ca_csv.withColumn("language", lit("CA"))
df_de_csv = df_de_csv.withColumn("language", lit("DE"))
df_fr_csv = df_fr_csv.withColumn("language", lit("FR"))
df_gb_csv = df_gb_csv.withColumn("language", lit("GB"))
df_in_csv = df_in_csv.withColumn("language", lit("IN"))
df_jp_csv = df_jp_csv.withColumn("language", lit("JP"))
df_kr_csv = df_kr_csv.withColumn("language", lit("KR"))
df_mx_csv = df_mx_csv.withColumn("language", lit("MX"))
df_ru_csv = df_ru_csv.withColumn("language", lit("RU"))
df_us_csv = df_us_csv.withColumn("language", lit("US"))

#Unindo as tabelas
df_csv = df_ca_csv.union(df_br_csv).union(df_de_csv).union(df_fr_csv).union(df_gb_csv).union(df_in_csv).union(df_jp_csv).union(df_kr_csv).union(df_mx_csv).union(df_ru_csv).union(df_us_csv)

# Limpando as linhas duplicadas 
df_csv = df_csv.distinct()

# selecionando apenas as colunas que iremos utilizar em pelo menos uma consulta
columns_to_keep = ["title", "publishedAt", "channelTitle","categoryId","language","view_count","likes","comment_count"]

# adicionar coluna: numero de vezes que se manteve em trend (window function) e cast de valores (String->int)
w = Window.partitionBy("title","language")
df_csv = df_csv.withColumn("trending_days", count(col("title")).over(w)) \
    .withColumn("view_count", col("view_count").cast("int")) \
    .withColumn("likes", col("likes").cast("int")) \
    .withColumn("comment_count", col("comment_count").cast("int")) 

# agreção por soma do views, likes, commentarios, deslikes
columns_to_keep_agg = ["title", "publishedAt", "channelTitle","categoryId","language","trending_days"]
df_csv = df_csv.groupBy(columns_to_keep_agg).agg(sum(col("likes")).alias("likes"),sum(col("view_count")).alias("views")\
                                            ,sum(col("comment_count")).alias("comment_count"))

# join com tabela de categorias 
columns_to_keep = ["title", "publishedAt", "channelTitle","category","categoryId","language","trending_days","views","likes","comment_count"]
df_final = df_csv.join(df_json, df_csv.categoryId == df_json.id).drop(df_json.id)

df_final.select(columns_to_keep).orderBy(desc("views")).show()

+-------------------------+--------------------+--------------+---------------+----------+--------+-------------+----------+---------+-------------+
|                    title|         publishedAt|  channelTitle|       category|categoryId|language|trending_days|     views|    likes|comment_count|
+-------------------------+--------------------+--------------+---------------+----------+--------+-------------+----------+---------+-------------+
|     Turn into orbeez ...|2021-07-03T04:04:57Z|        FFUNTV|  Entertainment|        24|      US|           36|4992282494|166216047|        66321|
|     BLACKPINK - ‘Pink...|2022-08-19T04:00:13Z|     BLACKPINK|          Music|        10|      US|           23|4816767457|266000963|     73459666|
|     BLACKPINK - ‘Pink...|2022-08-19T04:00:13Z|     BLACKPINK|          Music|        10|      KR|           20|4052943212|228007915|     63073918|
|     $1 vs $1,000,000,...|2023-06-10T16:00:00Z|       MrBeast|  Entertainment|        24|      US|       

In [None]:
from pyspark.sql import SparkSession

df_final_c = df_final.drop("categoryId")
display(df_final_c)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, year, when, desc, monotonically_increasing_id,avg,asc

# Tabela canal
df_canal = df_final.groupBy("channelTitle").agg(sum(col("views")).alias("views"))
df_canal = df_canal.withColumn("id", monotonically_increasing_id() + 1)

# Tabela grupo
#adicionar coluna de faixa de visualizações
df_final = df_final.withColumn("rangeOfViews",
    when((col("views") >= 0) & (col("views") <= 1000000), "Grupo 1") \
    .when((col("views") > 1000000) & (col("views") <= 5000000), "Grupo 2") \
    .when(col("views") > 5000000, "Grupo 3"))

df_grupo = df_final.select("rangeOfViews").distinct()
df_grupo = df_grupo.withColumn("id",monotonically_increasing_id() + 1)
# Tabela categoria
df_categoria = df_final.select("categoryId","category").distinct()
df_categoria= df_categoria.withColumnRenamed("category","categoryName")
# Tabela paises  
df_pais = df_final.select("language").distinct()
df_pais = df_pais.withColumn("id", monotonically_increasing_id() + 1)
df_pais = df_pais.withColumnRenamed("language","languageName")

# Relação canal-categoria
df_r_canal_categoria = df_final.select("category","channelTitle").distinct()
df_r_canal_categoria = df_r_canal_categoria.where(col("channelTitle").isNotNull() & col("category").isNotNull())

# Relaçao canal-paises 
df_r_canal_paises = df_final.select("channelTitle","language").distinct()
df_r_canal_paises = df_r_canal_paises.where(col("channelTitle").isNotNull() & col("language").isNotNull())

#Relação entre canal-grupo
df_r_canal_grupo = df_final.select("channelTitle","rangeOfViews").distinct()
df_r_canal_grupo = df_r_canal_grupo.where(col("channelTitle").isNotNull() & col("rangeOfViews").isNotNull())




In [None]:
from pyspark.sql import SparkSession
#instanceid="beb1dcaf"
#instancename="Instance01"
# Carregando conector

#Atributos de credenciais
url = "neo4j+s://beb1dcaf.databases.neo4j.io"
username="neo4j"
password="Vu41pYnHMzUq30d4Zk6zIeUre1T3K_vwpTKFU4Tq2P8"
#Adição tabela categoria
df_categoria.write.format("org.neo4j.spark.DataSource")\
        .mode("Overwrite")\
        .option("url", url)\
        .option("authentication.type", "basic")\
        .option("authentication.basic.username", username)\
        .option("authentication.basic.password", password)\
        .option("labels", ":Categoria")  \
        .option("node.keys", "categoryId")\
        .save()
#Adição tabela de canal 
df_canal.write.format("org.neo4j.spark.DataSource")\
        .mode("Overwrite")\
        .option("url", url)\
        .option("authentication.type", "basic")\
        .option("authentication.basic.username", username)\
        .option("authentication.basic.password", password)\
        .option("labels", ":Canal")  \
        .option("node.keys", "id")\
        .save()

#Adição tabela grupo
df_grupo.write.format("org.neo4j.spark.DataSource")\
        .mode("Overwrite")\
        .option("url", url)\
        .option("authentication.type", "basic")\
        .option("authentication.basic.username", username)\
        .option("authentication.basic.password", password)\
        .option("labels", ":Grupo")  \
        .option("node.keys", "id")\
        .save()
#Adição tabelas paises 
df_pais.write.format("org.neo4j.spark.DataSource")\
        .mode("Overwrite")\
        .option("url", url)\
        .option("authentication.type", "basic")\
        .option("authentication.basic.username", username)\
        .option("authentication.basic.password", password)\
        .option("labels", ":Pais")  \
        .option("node.keys", "id")\
        .save()
#Adição relação canal-categoria
df_r_canal_categoria.repartition(1).write.format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", url)\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", username)\
    .option("authentication.basic.password", password)\
    .option("relationship", "TEM_VIDEO_NA")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.source.labels", ":Canal")\
    .option("relationship.source.save.mode", "overwrite")\
    .option("relationship.source.node.keys", "channelTitle")\
    .option("relationship.target.labels", ":Categoria")\
    .option("relationship.target.node.keys", "category")\
    .option("relationship.target.save.mode", "overwrite")\
    .save()
#Adição relação canal-paises
df_r_canal_paises.repartition(1).write.format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", url)\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", username)\
    .option("authentication.basic.password", password)\
    .option("relationship", "TEM_VIDEO_DE")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.source.labels", ":Canal")\
    .option("relationship.source.save.mode", "overwrite")\
    .option("relationship.source.node.keys", "channelTitle")\
    .option("relationship.target.labels", ":Pais")\
    .option("relationship.target.node.keys", "language")\
    .option("relationship.target.save.mode", "overwrite")\
    .save()
#Adição relação canal-grupo
df_r_canal_grupo.repartition(1).write.format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", url)\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", username)\
    .option("authentication.basic.password", password)\
    .option("relationship", "TEM_VIDEO_NO")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.source.labels", ":Canal")\
    .option("relationship.source.save.mode", "overwrite")\
    .option("relationship.source.node.keys", "channelTitle")\
    .option("relationship.target.labels", ":Grupo")\
    .option("relationship.target.node.keys", "rangeOfViews")\
    .option("relationship.target.save.mode", "overwrite")\
    .save()

In [None]:
from pyspark.sql import SparkSession

url = "neo4j+s://beb1dcaf.databases.neo4j.io"
username="neo4j"
password="Vu41pYnHMzUq30d4Zk6zIeUre1T3K_vwpTKFU4Tq2P8"

test_read = spark.read.format("org.neo4j.spark.DataSource")\
    .option("url", url)\
    .option("authentication.type", "basic")\
    .option("authentication.basic.username", username)\
    .option("authentication.basic.password", password)\
    .option("labels", ":Categoria")\
    .load()\
    .show()

In [None]:
%scala
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import spark.implicits._

val dbName = spark.sparkContext.getConf.get("spark.database.name")
val keyspace = spark.sparkContext.getConf.get("spark.keyspace.name")

spark.conf.set(s"spark.sql.catalog.$dbName", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.sql(s"use $dbName.$keyspace")

val video_trends_table = spark.sql("select * from df_final_c")

video_trends_table.createCassandraTable(keyspace, "video_trends")

video_trends_table.write.cassandraFormat("video_trends", keyspace).mode("append").save()
spark.sql("show tables").show()

val data = sc.cassandraTable(keyspace, "video_trends")

data.count()


In [None]:
%scala
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import spark.implicits._
import com.datastax.spark.connector.cql.ClusteringColumn

val dbName = spark.sparkContext.getConf.get("spark.database.name")
val keyspace = spark.sparkContext.getConf.get("spark.keyspace.name")

spark.conf.set(s"spark.sql.catalog.$dbName", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.sql(s"use $dbName.$keyspace")

val video_trends_table = spark.sql("select * from df_final_c")

video_trends_table.createCassandraTableEx(keyspace, "video_trends_language_category", Seq("language", "category"), Seq(("views", ClusteringColumn.Descending), ("trending_days", ClusteringColumn.Descending)))

video_trends_table.write.cassandraFormat("video_trends_language_category", keyspace).mode("append").save()
spark.sql("show tables").show()

val data = sc.cassandraTable(keyspace, "video_trends_language_category")

data.count()