In [1]:
import logging, datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery

    
'''
5/11/2021
The following ParDo 'MakeHasGenre' function creates a junction table for the genres of a particular titleID.
It takes in the titleID and list of genres as a string and creates a record for each unique titleID
and genre combination. The FARM_FINGERPRINT method would not work for the creation of a unqiue id for genreID,
so that was performed after in bigquery with MD5.
'''
class MakeHasGenre(beam.DoFn):
    def process(self, element):
        titleID = element['titleID']
        genres = element['genres']
        if genres is not None:
            genres_list = genres.split(',')
            records_list = []
            for genre in genres_list:
                record = {'titleID': titleID, 'genreID': genre}
                records_list.append(record)
            return records_list

        
def run():
    PROJECT_ID = 'coastal-well-303101'
    BUCKET = 'gs://allnaturalbrandy2021' # should we change bucket?
    DIR_PATH = BUCKET + '/output/' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + '/'
    
    
    options = PipelineOptions(
    flags=None,
    runner='DataflowRunner',
    project=PROJECT_ID,
    job_name='hasgenre',
    temp_location=BUCKET + '/temp',
    region='us-central1')
    
    p = beam.pipeline.Pipeline(options=options)
    
    sql = 'SELECT titleID, genres FROM datamart.movie_title'
    bq_source = ReadFromBigQuery(query=sql, use_standard_sql=True, gcs_location=BUCKET)
    
    query_results = p | 'Read from BQ' >> beam.io.Read(bq_source)
    
    has_genre_pcoll = query_results | 'Make Genres Junction Table' >> beam.ParDo(MakeHasGenre())
    
    dataset_id = 'datamart'
    table_id = PROJECT_ID + ':' + dataset_id + '.' + 'has_genre_Dataflow'
    schema_id = 'titleID:STRING,genreID:STRING'
    
    has_genre_pcoll | 'Write has_genre to BQ' >> WriteToBigQuery(table=table_id, schema=schema_id, custom_gcs_temp_location=BUCKET)
    
    
    result = p.run()
    result.wait_until_finish()
    
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.ERROR)
    run()

  temp_location = pcoll.pipeline.options.view_as(
  temp_location = p.options.view_as(GoogleCloudOptions).temp_location


In [3]:
import logging, datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery


'''
5/11/2021
The following ParDo 'MakeGenre' function takes in a dictionary containing a genre from the previously made has_genre_Beam table.
It simply creates a record of genreID and genre and returns it.
The FARM_FINGERPRINT method would not work for the creation of a unqiue id for genreID,
so that was performed after in bigquery with MD5.
'''
class MakeGenre(beam.DoFn):
    def process(self, element):
        g = element['genreID']
        if g is not None:
            record = {'genreID': g, 'genre': g}
            return [record]


def run():
    PROJECT_ID = 'coastal-well-303101'
    BUCKET = 'gs://allnaturalbrandy2021'
    DIR_PATH = BUCKET + '/output/' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + '/'
    
    
    options = PipelineOptions(
    flags=None,
    runner='DataflowRunner',
    project=PROJECT_ID,
    job_name='genre',
    temp_location=BUCKET + '/temp',
    region='us-central1')
    
    p = beam.pipeline.Pipeline(options=options)
    
    sql = 'SELECT DISTINCT genreID FROM datamart.has_genre_Dataflow'
    bq_source = ReadFromBigQuery(query=sql, use_standard_sql=True, gcs_location=BUCKET)
    
    query_results = p | 'Read from BQ' >> beam.io.Read(bq_source)
    
    genre_pcoll = query_results | 'Make Genre Table' >> beam.ParDo(MakeGenre())

    dataset_id = 'datamart'
    table_id = PROJECT_ID + ':' + dataset_id + '.' + 'genre_Dataflow'
    schema_id = 'genreID:STRING,genre:STRING'
    
    genre_pcoll | 'Write genre to BQ' >> WriteToBigQuery(table=table_id, schema=schema_id, custom_gcs_temp_location=BUCKET)
    
    
    result = p.run()
    result.wait_until_finish()
    
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.ERROR)
    run()

5/11/2021 Making the unique genreID for the has_genre table in bigquery because the FARM_FINGERPRINT method wasn't working in apache beam. We used MD5 to be able to easily convert bytes to string.

In [1]:
%%bigquery
UPDATE datamart.has_genre_Dataflow SET genreID = TO_BASE64(MD5(genreID))
WHERE genreID is NOT NULL

5/11/2021 Making the unique genreID for the genre table in bigquery because the FARM_FINGERPRINT method wasn't working in apache beam. We used MD5 to be able to easily convert bytes to string.

In [3]:
%%bigquery
UPDATE datamart.genre_Dataflow SET genreID = TO_BASE64(MD5(genreID))
WHERE genreID is NOT NULL

In [5]:
%%bigquery
SELECT COUNT(*), COUNT(DISTINCT titleID)
FROM datamart.movie_title

Unnamed: 0,f0_,f1_
0,7821715,7821715


In [6]:
%%bigquery
SELECT *
FROM datamart.movie_title AS m
FULL OUTER JOIN datamart.has_genre_Dataflow AS hg
ON m.titleID = hg.titleID
WHERE m.titleID IS NULL

Unnamed: 0,titleID,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runTimeMinutes,genres,titleID_1,genreID


In [7]:
%%bigquery
SELECT g.genreID AS genreID, hg.genreID AS has_genreID
FROM datamart.genre_Dataflow AS g
FULL OUTER JOIN datamart.has_genre_Dataflow AS hg
ON g.genreID = hg.genreID
WHERE g.genreID IS NULL

Unnamed: 0,genreID,has_genreID


In [8]:
%%bigquery
SELECT COUNT(*), COUNT(DISTINCT titleID || genreID)
FROM datamart.has_genre_Dataflow

Unnamed: 0,f0_,f1_
0,12421361,12421361


In [9]:
%%bigquery
SELECT COUNT(*), COUNT(DISTINCT genreID)
FROM datamart.genre_Dataflow

Unnamed: 0,f0_,f1_
0,234,234


5/11/2021
Adding an industry column to be able to perform cross-data set queries on movie titles

In [7]:
%%bigquery
ALTER TABLE datamart.movie_title
ADD COLUMN industry STRING

5/11/2021
Deleting genres column as it is old and contains information that is now properly represented in the schema

In [14]:
%%bigquery
ALTER TABLE datamart.movie_title
DROP COLUMN genres

5/11/2021
Setting the industry for hollywood records, identified by the titleID length being shorter than the titleID for bollywood. This is because the titleID for bollywood movies was generated manually with a length known to be longer than the titleID already present for movies from the IMDb dataset.

In [10]:
%%bigquery
UPDATE datamart.movie_title SET industry = 'hollywood'
WHERE LENGTH(titleID) < 25

5/11/2021
Setting the industry for bollywood records, identified by the titleID length being longer than the titleID for hollywood. This is because the titleID for bollywood movies was generated manually with a length known to be longer than the titleID already present for movies from the IMDb dataset.

In [12]:
%%bigquery
UPDATE datamart.movie_title SET industry = 'bollywood'
WHERE LENGTH(titleID) > 25

5/13/2021
The below query returns the movie titles shared between the datasets and organizes the output by the number of movies in a given genre in descending order.

In [33]:
%%bigquery
CREATE VIEW reports.test_third_query_actual AS
SELECT distinct genre, COUNT(genre) AS movie_count
FROM datamart.movie_title AS out
          JOIN datamart.has_genre_Dataflow AS hgOut
          ON out.titleID = hgOut.titleID
          JOIN datamart.genre_Dataflow AS gOut
          ON hgOut.genreID = gOut.genreID
WHERE primaryTitle in (SELECT primaryTitle
    FROM (SELECT AVG(startYear) AS startYear, primaryTitle, COUNT(primaryTitle) AS num, industry, COUNT(genre) AS count_genre
         FROM datamart.movie_title AS m
          JOIN datamart.has_genre_Dataflow AS hg
          ON m.titleID = hg.titleID
          JOIN datamart.genre_Dataflow AS g
          ON hg.genreID = g.genreID
         GROUP BY primaryTitle, industry
         HAVING num > 1 AND num is not NULL) AS m1
    GROUP BY primaryTitle
    HAVING COUNT(primaryTitle) > 1)
GROUP BY genre
ORDER BY movie_count DESC
LIMIT 5

5/13/2021
The below query returns the movie titles shared between the datasets and organizes the number of movie title occurrences against the movie's title.

In [10]:
%%bigquery
CREATE VIEW reports.test_second_query AS
    SELECT CAST(AVG(startYear) AS INT64) AS average_year, primaryTitle, SUM(num) AS num_movies
    FROM (SELECT AVG(startYear) AS startYear, primaryTitle, COUNT(primaryTitle) AS num, industry
         FROM datamart.movie_title AS m
          JOIN datamart.has_genre_Dataflow AS hg
          ON m.titleID = hg.titleID
          JOIN datamart.genre_Dataflow AS g
          ON hg.genreID = g.genreID
         GROUP BY primaryTitle, industry
         HAVING num > 1 AND num is not NULL) AS m1
    GROUP BY primaryTitle
    HAVING COUNT(primaryTitle) > 1
    ORDER BY average_year ASC

5/13/2021
The below query groups records by genre, startYear, and industry to ultimately represent the number of genres for a given year and industry.

In [7]:
%%bigquery
CREATE VIEW reports.test_first_query AS
SELECT startYear AS year, genre, COUNT(Genre) AS movie_count, industry
    FROM datamart.movie_title AS m
    JOIN datamart.has_genre_Dataflow AS hg
    ON m.titleID = hg.titleID
    JOIN datamart.genre_Dataflow AS g
    ON g.genreID = hg.genreID
    WHERE genre != '\\N'
    GROUP BY genre, startYear, industry
    HAVING startYear BETWEEN 1910 AND 2021 AND movie_count >= 20
    ORDER BY year