# ETL (Extract, Transform and Load)
## Trabalho Final | Prática | Grupo G
Base de dados usada: https://www.kaggle.com/datasets/whenamancodes/popular-movies-datasets-58000-movies?select=tags.csv'

Slide: [Canva](https://www.canva.com/design/DAFYb14LbAA/J_Kk7ndEoZM1m3Tw_glTIA/edit?utm_content=DAFYb14LbAA&utm_campaign=designshare&utm_medium=link2&utm_source=sharebutton)

|Alunos|Matrícula|
|--|--|
|Victor Buendia Cruz De Alvim|19/0020601|
|Lucas Ursulino Boaventura|18/0114093|
|Yudi Yamane de Azevedo|16/0149410|

##  ⚙️ Setup ===========================

In [1]:
pip install opendatasets pandas pandasql findspark pyspark sqlalchemy ipython-sql psycopg2 --quiet

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pandasql import sqldf
import opendatasets as od
import pandas
import os
import re
import sqlalchemy


In [3]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
# spark = SparkSession.builder \
#     .master('local[*]') \tView
#     .config("spark.executor.cores", "4") \
#     .config("spark.executor.memoryOverhead",200) \
#     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#     .config("spark.default.parallelism", "4") \
#     .config("spark.kryoserializer.buffer.max", "2000mb") \
#     .getOrCreate()

23/02/12 13:01:07 WARN Utils: Your hostname, lucas resolves to a loopback address: 127.0.1.1; using 192.168.0.31 instead (on interface wlp0s20f3)
23/02/12 13:01:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/12 13:01:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
db = sqlalchemy.create_engine('postgresql://diqquvbkvxsebz:b92a9ea92ee38e17e3da5ca9116fd4ba2492784cd1675f017103ce8c6a29fc76@ec2-34-207-12-160.compute-1.amazonaws.com:5432/d7fdq9nknhq43j')

In [5]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

## ⬇️ Extract =============================

In [6]:
if not os.path.exists('./popular-movies-datasets-58000-movies/'):
    kaggleAPI = input('[TOKEN API] Insire seu Token API do Kaggle:')

    fp = open('kaggle.json', 'w')
    fp.write(kaggleAPI)
    fp.close()

    od.download(
        'https://www.kaggle.com/datasets/whenamancodes/popular-movies-datasets-58000-movies?select=tags.csv')

    os.remove('kaggle.json')

In [7]:
csvs = {}
for dirname, _, filenames in os.walk('./popular-movies-datasets-58000-movies/'):
    for filename in filenames:
        csvs[filename] = os.path.join(dirname, filename)
        print(os.path.join(dirname, filename))

./popular-movies-datasets-58000-movies/movies.csv
./popular-movies-datasets-58000-movies/genome-scores.csv
./popular-movies-datasets-58000-movies/ratings.csv
./popular-movies-datasets-58000-movies/links.csv
./popular-movies-datasets-58000-movies/genome-tags.csv
./popular-movies-datasets-58000-movies/tags.csv


In [8]:
dfs = {}

for filename in csvs:
    file=(csvs[filename])
    newData = spark.read.format("csv").option("header","true").load(file)
    dfs[re.sub('-', '_', re.findall("(.+).csv", filename)[0])] = newData
    newData.createOrReplaceTempView(re.sub('-', '_', re.findall("(.+).csv", filename)[0]))

In [9]:
for df in dfs:
    print(df,'\n |-->', dfs[df], '\n')

movies 
 |--> DataFrame[movieId: string, title: string, genres: string] 

genome_scores 
 |--> DataFrame[movieId: string, tagId: string, relevance: string] 

ratings 
 |--> DataFrame[userId: string, movieId: string, rating: string, timestamp: string] 

links 
 |--> DataFrame[movieId: string, imdbId: string, tmdbId: string] 

genome_tags 
 |--> DataFrame[tagId: string, tag: string] 

tags 
 |--> DataFrame[userId: string, movieId: string, tag: string, timestamp: string] 



# ✨ Transform =========================

In [10]:
def sqlR(sql):
    return (spark.sql(sql).toPandas())

def tView(sql, name):
    spark.sql(sql).createOrReplaceTempView(name)

---
## Movies

In [13]:
spark.table('movies')\
.withColumn('genres', F.split(spark.table('movies')['genres'], '\|'))\
.withColumn('publish_year', F.when(F.regexp_extract(F.col('title'), '\((\d\d\d\d)\)', 1) == '', None)\
                             .otherwise(F.regexp_extract(F.col('title'), '\((\d\d\d\d)\)', 1)))\
.withColumn('title', F.when(F.trim(F.regexp_extract(F.col('title'), '(.+)\(\d\d\d\d\)$', 1))\
                           == '', F.col('title'))\
                           .otherwise(F.trim(F.regexp_extract(F.col('title'), '(.+)\(\d\d\d\d\)$', 1))))\
.createOrReplaceTempView('movies')

sql = """
    SELECT * FROM movies
"""

sqlR(sql)


                                                                                

Unnamed: 0,movieId,title,genres,publish_year
0,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1995
1,2,Jumanji,"[Adventure, Children, Fantasy]",1995
2,3,Grumpier Old Men,"[Comedy, Romance]",1995
3,4,Waiting to Exhale,"[Comedy, Drama, Romance]",1995
4,5,Father of the Bride Part II,[Comedy],1995
...,...,...,...,...
58093,193876,The Great Glinka,[(no genres listed)],1946
58094,193878,Les tribulations d'une caissière,[Comedy],2011
58095,193880,Her Name Was Mumu,[Drama],2016
58096,193882,Flora,"[Adventure, Drama, Horror, Sci-Fi]",2017


In [14]:
sql = """

WITH M AS (
    SELECT
        M.movieId
        , M.title
        , M.genres
        , M.publish_year
        , SUM(R.rating)/COUNT(R.rating) AS average_rating
        , COUNT(DISTINCT R.userId) AS user_review_amount
        , COUNT(R.rating) AS review_amount
    FROM
        movies AS M
    LEFT JOIN
        ratings AS R
        ON TRUE
            AND M.movieId = R.movieId
    GROUP BY
        1,2,3,4
)

SELECT * FROM M


"""

tView(sql, 'movie_reviews')
sqlR(sql)

                                                                                

Unnamed: 0,movieId,title,genres,publish_year,average_rating,user_review_amount,review_amount
0,100010,Battle of Los Angeles,"[Action, Sci-Fi]",2011,2.478261,46,46
1,100060,Sunny (Sseo-ni),[Drama],2011,3.645161,31,31
2,100062,My Way (Mai Wei),"[Action, Drama, War]",2011,3.621622,37,37
3,100068,Comme un chef,[Comedy],2012,3.568627,102,102
4,100070,Punching the Clown,[Comedy],2009,3.538462,13,13
...,...,...,...,...,...,...,...
58093,990,Maximum Risk,"[Action, Adventure, Thriller]",1996,2.731707,984,984
58094,99043,Trishna,[Drama],2011,3.153846,13,13
58095,99566,"True Meaning of Christmas Specials, The",[Comedy],2002,3.500000,1,1
58096,99600,"Man Who Haunted Himself, The","[Drama, Fantasy, Horror, Mystery, Thriller]",1970,3.300000,5,5


In [15]:
sql = """

WITH M AS (
    SELECT
        M.movieId
        , M.title
        , M.genres
        , M.publish_year
        , COUNT(DISTINCT T.userId) AS user_tag_amount
        , COUNT(T.tag) AS tag_amount
        , ARRAY_AGG(DISTINCT T.tag) AS tags
    FROM
        movies AS M
    LEFT JOIN
        tags AS T
        ON TRUE
            AND T.movieId = M.movieId
    GROUP BY
        1,2,3,4
)

SELECT * FROM M


"""

tView(sql, 'movie_tags')
sqlR(sql)

                                                                                

Unnamed: 0,movieId,title,genres,publish_year,user_tag_amount,tag_amount,tags
0,1,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",1995,252,782,"[UNLIKELY FRIENDSHIPS, Want, fanciful, light, ..."
1,10,GoldenEye,"[Action, Adventure, Thriller]",1995,64,152,"[CLV, memorable lines, Memorable Characters, B..."
2,100,City Hall,"[Drama, Thriller]",1996,9,20,"[undercover agent, investigation, presidential..."
3,1000,Curdled,[Crime],1996,2,10,"[bakery, murder, samurai sword, cleaning lady,..."
4,100001,"Comic, The","[Comedy, Drama]",1969,1,1,[silent film star]
...,...,...,...,...,...,...,...
58093,99989,Bonsái,[Drama],2011,2,2,"[Cristián Jiménez, nudity (topless)]"
58094,99992,Shadow Dancer,"[Crime, Drama, Thriller]",2012,4,7,"[Clive Owen, prospect preferred, IRA, spy, Jam..."
58095,99994,Thale,"[Action, Drama, Horror, Mystery]",2012,4,13,"[Original, Visually Appealing, scandinavian ta..."
58096,99996,It's a Disaster,"[Comedy, Drama]",2012,9,25,"[dark comedy, Comedy, Julia Stiles, group of f..."


In [16]:
sql = """

SELECT
    M.movieId
    , M.title
    , M.genres
    , M.publish_year
    , MT.user_tag_amount
    , MT.tag_amount
    , MT.tags
    , MR.average_rating
    , MR.user_review_amount
    , MR.review_amount
FROM
    movies AS M
LEFT JOIN
    movie_reviews AS MR
    ON TRUE
        AND M.movieId = MR.movieId
LEFT JOIN
    movie_tags AS MT
    ON TRUE
        AND M.movieId = MT.movieId
ORDER BY
    M.title


"""


tView(sql, 'movies_df')
sqlR(sql)

                                                                                

Unnamed: 0,movieId,title,genres,publish_year,user_tag_amount,tag_amount,tags,average_rating,user_review_amount,review_amount
0,51372,"""""""Great Performances"""" Cats (1998)""",[Musical],1998,3,10,"[slow motion, BD-R, Andrew Lloyd Webber, music...",2.815556,225,225
1,7789,"""11'09""""01 - September 11 (2002)""",[Drama],2002,6,15,"[9/11, woman director, library, elderly, skysc...",3.367424,132,132
2,112809,"""Diebuster """"Top wo Narae 2"""" (2004)""","[Action, Animation, Drama, Sci-Fi]",2004,2,5,"[anime, Gainax, Mecha, Sci-Fi]",3.375000,12,12
3,174029,"""Divers at Work on the Wreck of the """"Maine"""" ...",[(no genres listed)],1898,1,3,"[docudrama, diver, silent film]",3.000000,3,3
4,182853,"""How Viktor """"The Garlic"""" Took Alexey """"The S...",[Drama],2017,2,4,"[foreign, russian, black humor]",3.718750,32,32
...,...,...,...,...,...,...,...,...,...,...
58093,159678,…And the Fifth Horseman Is Fear,"[Drama, War]",1965,0,0,[],3.250000,2,2
58094,132604,キサラギ,"[Comedy, Mystery]",2007,1,2,"[small room, comedy]",3.666667,3,3
58095,132319,チェブラーシカ,"[Animation, Children]",2010,1,4,"[talking animals, puppet, USSR, Russian]",3.150000,10,10
58096,130640,貞子3D,[Horror],2012,2,7,"[vengeful ghost, ringu, online, onryo-dead gir...",2.136364,11,11


In [17]:
sql = """

SELECT
    M.movieId
    , L.imdbId
    , L.tmdbId
    , M.title
    , M.genres
    , M.publish_year
    , M.user_tag_amount
    , M.tag_amount
    , M.tags
    , M.average_rating
    , M.user_review_amount
    , M.review_amount
FROM
    movies_df AS M
LEFT JOIN
    links AS L
    ON TRUE
        AND M.movieId = L.movieId

"""


tView(sql, 'movies')
sqlR(sql)

                                                                                

Unnamed: 0,movieId,imdbId,tmdbId,title,genres,publish_year,user_tag_amount,tag_amount,tags,average_rating,user_review_amount,review_amount
0,100010,1758570,59197,Battle of Los Angeles,"[Action, Sci-Fi]",2011,4,14,"[los angeles, spaceship, alien, shootout, horr...",2.478261,46,46
1,100060,1937339,77117,Sunny (Sseo-ni),[Drama],2011,3,5,"[experimental, bullying, gang, Korea, netflix ...",3.645161,31,31
2,100062,1606384,94047,My Way (Mai Wei),"[Action, Drama, War]",2011,1,1,[world war ii],3.621622,37,37
3,100068,1911553,85872,Comme un chef,[Comedy],2012,4,5,"[Michael Youn, cooking, funny]",3.568627,102,102
4,100070,1192624,47863,Punching the Clown,[Comedy],2009,2,5,"[comedy, struggling career, comedian, good hum...",3.538462,13,13
...,...,...,...,...,...,...,...,...,...,...,...,...
58093,99600,0066053,4910,"Man Who Haunted Himself, The","[Drama, Fantasy, Horror, Mystery, Thriller]",1970,2,7,"[ghost, madness, car crash, cult film, reality...",3.300000,5,5
58094,99675,2085002,127901,Eat Sleep Die (Äta sova dö),[Drama],2012,4,7,"[realistic, immigrants, Gabriela Pichler, work...",3.736842,19,19
58095,99811,1931388,97610,Beware of Mr. Baker,[Documentary],2012,4,9,"[rock and roll, drums, Life Story, drummer, Ja...",3.641509,53,53
58096,99901,1869593,,O Panishyros Megistanas Ton Ninja,"[Action, Comedy, Sci-Fi]",2008,1,3,"[funny, low budget, cult]",2.125000,4,4


In [18]:
movies = spark.sql('SELECT * FROM movies;').collect()

                                                                                

---
## Ratings

In [12]:
spark.table('ratings')\
.withColumn('dt', F.to_date(F.col('timestamp')\
                            .cast('bigint')\
                            .cast(dataType=T.TimestampType()), 'yyyy-MM-dd HH:mm:ss') )\
.withColumn('timestamp', F.col('timestamp')\
                            .cast('bigint')\
                            .cast(dataType=T.TimestampType()) )\
.createOrReplaceTempView('ratings')

sql = """

SELECT
    *
FROM
    ratings
"""


tView(sql, 'ratings')
# sqlR(sql)

In [13]:
ratings = spark.sql('SELECT * FROM ratings LIMIT 1000000;').collect()

                                                                                

---
## Tags

In [11]:
spark.table('tags')\
.withColumn('dt', F.to_date(F.col('timestamp')\
                            .cast('bigint')\
                            .cast(dataType=T.TimestampType()), 'yyyy-MM-dd HH:mm:ss') )\
.withColumn('timestamp', F.col('timestamp')\
                            .cast('bigint')\
                            .cast(dataType=T.TimestampType()) )\
.createOrReplaceTempView('tags')

sql = """

SELECT
    *
FROM
    tags

"""


tView(sql, 'tags')
sqlR(sql)

  series = series.astype(t, copy=False)


Unnamed: 0,userId,movieId,tag,timestamp,dt
0,14,110,epic,2015-09-24 23:35:38,2015-09-24
1,14,110,Medieval,2015-09-24 23:35:32,2015-09-24
2,14,260,sci-fi,2015-09-13 15:36:50,2015-09-13
3,14,260,space action,2015-09-13 15:37:01,2015-09-13
4,14,318,imdb top 250,2015-09-18 19:26:35,2015-09-18
...,...,...,...,...,...
1108992,283206,73017,fun,2010-01-24 22:24:19,2010-01-24
1108993,283206,73017,homoerotic subtext,2010-01-24 22:24:18,2010-01-24
1108994,283206,73017,pacing,2010-01-24 22:24:18,2010-01-24
1108995,283206,73017,plot,2010-01-24 22:24:18,2010-01-24


In [12]:
tags = spark.sql('SELECT * FROM tags;').collect()

                                                                                

---
## Genres

In [23]:
sql = """

WITH E AS (
    SELECT
        movieId
        , publish_year
        , EXPLODE(genres) AS genre
    FROM
        movies
)

, F AS (
    SELECT
        genre
        , publish_year
        , COUNT(DISTINCT movieId) AS movies
    FROM
        E
    GROUP BY
        1,2
    ORDER BY
        2,1
)

SELECT * FROM F

"""


tView(sql, 'genres')
sqlR(sql)

Unnamed: 0,genre,publish_year,movies
0,(no genres listed),,212
1,Action,,16
2,Adventure,,9
3,Animation,,2
4,Children,,5
...,...,...,...
2068,Romance,2018,70
2069,Sci-Fi,2018,70
2070,Thriller,2018,144
2071,War,2018,11


In [24]:
genres = spark.sql('SELECT * FROM genres;').collect()

                                                                                

---
## Load ☁️ ===========================

In [24]:
tempViews = {}

tempViews['movies'] = spark.table('movies')
tempViews['tags'] = spark.table('tags')
tempViews['ratings'] = spark.table('ratings')
tempViews['genres'] = spark.table('genres')
tempViews['genome_scores'] = spark.table('genome_scores')
tempViews['genome_tags'] = spark.table('genome_tags')

for df in dfs:
    print(df,'\n |-->', dfs[df], '\n')

links 
 |--> DataFrame[movieId: string, imdbId: string, tmdbId: string] 

tags 
 |--> DataFrame[userId: string, movieId: string, tag: string, timestamp: string] 

genome_tags 
 |--> DataFrame[tagId: string, tag: string] 

ratings 
 |--> DataFrame[userId: string, movieId: string, rating: string, timestamp: string] 

genome_scores 
 |--> DataFrame[movieId: string, tagId: string, relevance: string] 

movies 
 |--> DataFrame[movieId: string, title: string, genres: string] 



In [22]:
# Helpers to format values
import datetime

def write_to_file(text, file):
    f = open(file, "w")
    f.write(text)
    f.close()


def strstr(string):
    if '"""' in string:
        string = string.replace('"""', '')
    if "'" in string:
        string = string.replace("'", " ")
    if "[" in string:
        string = string.replace("[", "")
    if "]" in string:
        string = string.replace("]", "")
    if "\\" in string:
        string = string.replace("\\", "")
    if ":" in string:
        string = string.replace(":", " ")
    if "," in string:
        string = string.replace(",", "")

    return f"'{string}'"


def array(arr):
    if len(arr) == 0: return 'null'
    
    format_arr = f"ARRAY {list(map(lambda x: strstr(x), arr))}"
    
    if '"""' in format_arr:
        format_arr = format_arr.replace('"""', '')
    if '"' in format_arr:
        format_arr = format_arr.replace('"', "")
    if "\\" in format_arr:
        format_arr = format_arr.replace("\\", "")
    if ":" in format_arr:
        format_arr = format_arr.replace(":", " ")
    if "''" in format_arr:
        format_arr = format_arr.replace("''", "'")

    return format_arr


def to_int(number):
    if number is None:
        return 'null'
    return int(number)


def to_float(number):
    if number is None:
        return 'null'
    return float(number)


def to_datetime(date):
    try:
        if date is None or (date is str and date.isalpha()):
            return 'null'
        if type(date) is not datetime.datetime and date.isnumeric():
            date = datetime.datetime.fromtimestamp(int(date))
        return f"'{date.strftime('%Y-%m-%d %H:%M:%S')}'"
    except:
        return 'null'


def to_date(date):
    try:
        if date is None or (date is str and date.isalpha()):
            return 'null'
        if type(date) is not datetime.date and date.isnumeric():
            date = datetime.datetime.fromtimestamp(int(date))
        return f"'{date.strftime('%Y-%m-%d')}'"
    except:
        return 'null'

In [21]:
# Load Movies

format_to_movies = {
    'movieId': to_int,
    'imdbId': to_int,
    'tmdbId': to_int,
    'title': strstr,
    'genres': array,
    'publish_year': to_int,
    'user_tag_amount': to_int,
    'tag_amount': to_int,
    'tags': array,
    'average_rating': to_float,
    'user_review_amount': to_int,
    'review_amount': to_int
}

def format_movie_values(movie):
    return {key:format_to_movies[key](movie[key]) for key in movie.keys()}

with db.connect() as con:
    insert_sql = "INSERT INTO movies (movieId, imdbId, tmdbId, title, genres, publish_year, user_tag_amount, tag_amount, tags, average_rating, user_review_amount, review_amount) \n\tVALUES "
    format_movies = list(map(lambda x: x.asDict(), movies))

    for idx_movie, movie in enumerate(format_movies):
        print(f'---> Prepare insert [{idx_movie}]...')
        
        try:
            new_movie = list(format_movie_values(movie).values())

            insert_sql += '('
            for idx, value in enumerate(new_movie):
                insert_sql += f'{value}' + ('' if idx + 1 >= len(new_movie) else ', ')
            insert_sql += ')' + (';' if idx_movie + 1 >= len(format_movies) else ',\n\t')
        except Exception as e:
            print(f'Index error: {idx_movie}')
            print(e)
    
    print('---> Start to write sql to file ...')
    insert_sql = insert_sql.replace("', ',", "', ")
    write_to_file(insert_sql, "./sql/popula_movies.sql")
    
    print('---> Start to bulk insert values to database...')
    con.execute(sqlalchemy.text(insert_sql))
    print('---> Commiting changes into database...')
    con.commit()
    print('---> Finally routine!')

---> Prepare insert [0]...
---> Prepare insert [1]...
---> Prepare insert [2]...
---> Prepare insert [3]...
---> Prepare insert [4]...
---> Prepare insert [5]...
---> Prepare insert [6]...
---> Prepare insert [7]...
---> Prepare insert [8]...
---> Prepare insert [9]...
---> Prepare insert [10]...
---> Prepare insert [11]...
---> Prepare insert [12]...
---> Prepare insert [13]...
---> Prepare insert [14]...
---> Prepare insert [15]...
---> Prepare insert [16]...
---> Prepare insert [17]...
---> Prepare insert [18]...
---> Prepare insert [19]...
---> Prepare insert [20]...
---> Prepare insert [21]...
---> Prepare insert [22]...
---> Prepare insert [23]...
---> Prepare insert [24]...
---> Prepare insert [25]...
---> Prepare insert [26]...
---> Prepare insert [27]...
---> Prepare insert [28]...
---> Prepare insert [29]...
---> Prepare insert [30]...
---> Prepare insert [31]...
---> Prepare insert [32]...
---> Prepare insert [33]...
---> Prepare insert [34]...
---> Prepare insert [35]...
--

In [None]:
# Load Ratings

format_to_ratings = {
    'userId': to_int,
    'movieId': to_int,
    'rating': to_float,
    'timestamp': to_datetime,
    'dt': to_date
}

def format_rating_values(rating):
    return {key:format_to_ratings[key](rating[key]) for key in rating.keys()}

with db.connect() as con:
    insert_sql = "INSERT INTO ratings (userId, movieId, rating, timestamp, dt) \n\tVALUES "
    format_ratings = list(map(lambda x: x.asDict(), ratings))
    
    for idx_rating, rating in enumerate(format_ratings):
        print(f'---> Prepare insert [{idx_rating}]...')

        try:
            new_rating = list(format_rating_values(rating).values())

            insert_sql += '('
            for idx, value in enumerate(new_rating):
                insert_sql += f'{value}' + ('' if idx + 1 >= len(new_rating) else ', ')
            insert_sql += ')' + (';\n' if idx_rating + 1 >= len(format_ratings) else ',\n\t')
        except Exception as e:
            print(f'Index error: {idx_movie}')
            print(e)
    
    print('---> Start to write sql to file ...')
    insert_sql = insert_sql.replace("', ',", "', ")
    write_to_file(insert_sql, './sql/popula_ratings.sql')
    
    print('---> Start to bulk insert values to database...')
    con.execute(sqlalchemy.text(insert_sql))

    print('---> Commiting changes into database...')
    con.commit()
    print('---> Finally routine!')

In [15]:
# Load Tags

format_to_tags = {
    'userId': to_int,
    'movieId': to_int,
    'tag': strstr,
    'timestamp': to_datetime,
    'dt': to_date
}

def format_tag_values(tag):
    return {key:format_to_tags[key](tag[key]) for key in tag.keys()}

with db.connect() as con:
    insert_sql = "INSERT INTO tags (userId, movieId, tag, timestamp, dt) \n\tVALUES "
    format_tags = list(map(lambda x: x.asDict(), tags))
    
    for idx_tag, tag in enumerate(format_tags):
        # print(f'---> Prepare insert [{idx_tag}]...')

        try:
            new_tag = list(format_tag_values(tag).values())
            
            insert_sql += '('
            for idx, value in enumerate(new_tag):
                insert_sql += f'{value}' + ('' if idx + 1 >= len(new_tag) else ', ')
            insert_sql += ')' + (';' if idx_tag + 1 >= len(format_tags) else ',\n\t')

        except Exception as e:
            print(f'Index error: {idx_tag}')
            print(e)
            print(tag)
    
    print('---> Start to write sql to file ...')
    insert_sql = insert_sql.replace("', ',", "', ")
    # write_to_file(insert_sql, "./sql/popula_tags.sql")
    
    print('---> Start to bulk insert values to database...')
    con.execute(sqlalchemy.text(insert_sql))
    
    print('---> Commiting changes into database...')
    con.commit()
    print('---> Finally routine!')


---> Start to write sql to file ...
---> Start to bulk insert values to database...
---> Commiting changes into database...
---> Finally routine!


In [29]:
# Load Genres

format_to_genres = {
    'genre': strstr,
    'publish_year': to_int,
    'movies': to_int
}

def format_genre_values(genre):
    return {key:format_to_genres[key](genre[key]) for key in genre.keys()}

with db.connect() as con:
    insert_sql = "INSERT INTO genres (genre, publish_year, movies) \n\tVALUES "
    format_genres = list(map(lambda x: x.asDict(), genres))
    
    for idx_genre, genre in enumerate(format_genres):
        print(f'---> Prepare insert [{idx_genre}]...')

        try:
            new_genre = list(format_genre_values(genre).values())
            
            insert_sql += '('
            for idx, value in enumerate(new_genre):
                insert_sql += f'{value}' + ('' if idx + 1 >= len(new_genre) else ', ')
            insert_sql += ')' + (';' if idx_genre + 1 >= len(format_genres) else ',\n\t')

        except Exception as e:
            print(f'Index error: {idx_genre}')
            print(e)
            print(genre)

    print('---> Start to write sql to file ...')
    insert_sql = insert_sql.replace("', ',", "', ")
    write_to_file(insert_sql, "./sql/popula_genres.sql")
    
    print('---> Start to bulk insert values to database...')
    con.execute(sqlalchemy.text(insert_sql))
    print('---> Commiting changes into database...')
    con.commit()
    print('---> Finally routine!')

---> Prepare insert [0]...
---> Prepare insert [1]...
---> Prepare insert [2]...
---> Prepare insert [3]...
---> Prepare insert [4]...
---> Prepare insert [5]...
---> Prepare insert [6]...
---> Prepare insert [7]...
---> Prepare insert [8]...
---> Prepare insert [9]...
---> Prepare insert [10]...
---> Prepare insert [11]...
---> Prepare insert [12]...
---> Prepare insert [13]...
---> Prepare insert [14]...
---> Prepare insert [15]...
---> Prepare insert [16]...
---> Prepare insert [17]...
---> Prepare insert [18]...
---> Prepare insert [19]...
---> Prepare insert [20]...
---> Prepare insert [21]...
---> Prepare insert [22]...
---> Prepare insert [23]...
---> Prepare insert [24]...
---> Prepare insert [25]...
---> Prepare insert [26]...
---> Prepare insert [27]...
---> Prepare insert [28]...
---> Prepare insert [29]...
---> Prepare insert [30]...
---> Prepare insert [31]...
---> Prepare insert [32]...
---> Prepare insert [33]...
---> Prepare insert [34]...
---> Prepare insert [35]...
--

---
### Rascunho

In [None]:
sql = """

SELECT * FROM movies
--WHERE title LIKE '%(500)%' OR title LIKE '%(69)%'
--WHERE publish_year = ''
--WHERE title LIKE '%Bicycle, Spoon, Apple%'
--OR title LIKE '%Millions Game, The%'
--WHERE publish_year IS NULL
WHERE title IS NULL OR title = ''
LIMIT 327
"""


sqlR(sql)
spark.sql(sql).show(truncate=False,n=327)

In [11]:
sql = """

SELECT
    COUNT(*)
FROM
    genome_tags

"""


sqlR(sql)

Unnamed: 0,count(1)
0,1128
