## DS-3002: Sample Capstone Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-3002: Data Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "ds-2002final.mysql.database.azure.com"
jdbc_port = 1433
src_database = "movies"

connection_properties = {
  "user" : "abg5eqp",
  "password" : "Ravenclaw_2728",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "ds-midterm"
atlas_default_dbname = "movies"
atlas_user_name = "abg5eqp"
atlas_password = "Ravenclaw_2728"

# Data Files (JSON) Information ###############################
dst_database = "movies"

base_dir = "dbfs:/FileStore/ds2002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[57]: False

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.rho0qln.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.rho0qln.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS movies CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS movies
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds2002-capstone/movies"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_movie
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002final.mysql.database.azure.com:3306/movies_lt",
  dbtable "movies_lt.vdim_movie",
  user "abg5eqp",
  password "Ravenclaw_2728"
)

In [0]:
%sql
USE DATABASE movies;

CREATE TABLE IF NOT EXISTS movies.dim_movie
COMMENT "Movie Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/movies/dim_movie"
AS SELECT * FROM view_movie

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM movies.dim_movie LIMIT 5

movie_company_key,movie_id,company_id
1,5,14
2,5,59
3,11,1
4,11,306
5,12,3


In [0]:
%sql
DESCRIBE EXTENDED movies.dim_movie;

col_name,data_type,comment
movie_company_key,int,
movie_id,int,
company_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,movies,
Table,dim_movie,
Type,EXTERNAL,
Comment,Movie Dimension Table,


##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds-2002final.mysql.database.azure.com:3306/movies_lt",
  dbtable "movies_lt.vdim_date",
  user "abg5eqp",
  password "Ravenclaw_2728"
)

In [0]:
%sql
USE DATABASE movies;

CREATE TABLE IF NOT EXISTS movies.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds2002-capstone/movies/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM movies.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
DESCRIBE EXTENDED movies.dim_date;

col_name,data_type,comment
date_key,int,
full_date,string,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

path,name,size,modificationTime
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_movie.csv,dim_movie.csv,81855,1671065159000
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_movie_cast.csv,dim_movie_cast.csv,34655,1671065158000
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_movie_company.json,dim_movie_company.json,77174,1671065158000
dbfs:/FileStore/ds2002-capstone/source_data/batch/dim_production_company.json,dim_production_company.json,82196,1671065158000


##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds2002-capstone/source_data/batch'
json_files = {"production_company" : 'dim_production_company.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_default_dbname, source_dir, json_files) 

Out[73]: <pymongo.results.InsertManyResult at 0x7f36efc44d00>

##### 2.3. Fetch Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_production_company = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "movies").option("collection", "production_company").option("uri", "mongodb+srv://abg5eqp:Ravenclaw_2728@ds-midterm.rho0qln.mongodb.net/test").load()
display(df_production_company)

_id,company_name,production_company_key
List(639a7b3ad457b850f7526858),Lucasfilm,1
List(639a7b3ad457b850f7526859),Walt Disney Pictures,2
List(639a7b3ad457b850f752685a),Pixar Animation Studios,3
List(639a7b3ad457b850f752685b),Paramount Pictures,4
List(639a7b3ad457b850f752685c),Columbia Pictures,5
List(639a7b3ad457b850f752685d),RKO Radio Pictures,6
List(639a7b3ad457b850f752685e),DreamWorks,7
List(639a7b3ad457b850f752685f),Fine Line Features,8
List(639a7b3ad457b850f7526860),Gaumont,9
List(639a7b3ad457b850f7526861),WingNut Films,11


In [0]:
%scala
df_production_company.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_production_company.write.format("delta").mode("overwrite").saveAsTable("movies.dim_production_company")

In [0]:
%sql
DESCRIBE EXTENDED movies.dim_production_company

col_name,data_type,comment
_id,struct,
company_name,string,
production_company_key,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,movies,
Table,dim_production_company,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-capstone/movies/dim_production_company,


##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM movies.dim_production_company LIMIT 5

_id,company_name,production_company_key
List(639a7b3ad457b850f7526858),Lucasfilm,1
List(639a7b3ad457b850f7526859),Walt Disney Pictures,2
List(639a7b3ad457b850f752685a),Pixar Animation Studios,3
List(639a7b3ad457b850f752685b),Paramount Pictures,4
List(639a7b3ad457b850f752685c),Columbia Pictures,5


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
address_csv = f"{batch_dir}/dim_movie_company.csv"

df_movie_company= spark.read.format('csv').options(header='true', inferSchema='true').load(address_csv)
display(df_movie_company)

movie_company_key,movie_id,company_id
1,5,14
2,5,59
3,11,1
4,11,306
5,12,3
6,13,4
7,14,27
8,14,2721
9,16,11239
10,16,119


In [0]:
df_movie_company.printSchema()

root
 |-- movie_company_key: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- company_id: integer (nullable = true)



In [0]:
df_movie_company.write.format("delta").mode("overwrite").saveAsTable("dim_movie_company")

In [0]:
%sql
DESCRIBE EXTENDED movies.dim_movie_company;

col_name,data_type,comment
movie_company_key,int,
movie_id,int,
company_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,movies,
Table,dim_movie_company,
Type,MANAGED,
Location,dbfs:/FileStore/ds2002-capstone/movies/dim_movie_company,


In [0]:
%sql
SELECT * FROM movies.dim_movie_company LIMIT 5;

movie_company_key,movie_id,company_id
1,5,14
2,5,59
3,11,1
4,11,306
5,12,3


##### Verify Dimension Tables

In [0]:
%sql
USE movies;
SHOW TABLES

database,tableName,isTemporary
movies,dim_date,False
movies,dim_movie_cast,False
movies,dim_movie_company,False
movies,dim_production_company,False
,view_date,True
,view_movie_cast,True
,view_moviecast,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "company_id BIGINT")
 .option("cloudFiles.schemaHints", "title STRING")
 .option("cloudFiles.schemaHints", "budget BIGINT")
 .option("cloudFiles.schemaHints", "popularity DOUBLE") 
 .option("cloudFiles.schemaHints", "release_date DATE")
 .option("cloudFiles.schemaHints", "revenue BIGINT")
 .option("cloudFiles.schemaHints", "runtime BIGINT")
 .option("cloudFiles.schemaHints", "movie_status STRING")
 .option("cloudFiles.schemaHints", "vote_average DOUBLE") 
 .option("cloudFiles.schemaHints", "vote_count BIGINT")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

budget,company_id,movie_key,movie_status,popularity,release_date,revenue,runtime,title,vote_average,vote_count,_rescued_data,receipt_time,source_file
94000000,3,12,Released,85.688789,2003-05-30,940335536,100,Finding Nemo,7.6,6122,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
55000000,4,13,Released,138.133331,1994-07-06,677945399,142,Forrest Gump,8.2,7927,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,10214,68,Released,41.089863,1985-02-20,0,132,Brazil,7.5,861,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
70000000,8601,75,Released,44.090535,1996-12-12,101371017,106,Mars Attacks!,6.1,1509,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
2500000,98,76,Released,23.672571,1995-01-27,5535405,105,Before Sunrise,7.7,959,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
130000,22376,83,Released,15.611857,2004-08-06,54667954,79,Open Water,5.4,315,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
25000000,33,111,Released,70.105981,1983-12-08,65884703,170,Scarface,8.0,2948,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,7,116,Released,30.669913,2005-10-26,85306374,124,Match Point,7.3,1105,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
25000000,4,117,Released,38.272889,1987-06-02,76270454,119,The Untouchables,7.6,1384,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,10342,129,Released,118.968562,2001-07-20,274925095,125,千と千尋の神隠し,8.3,3840,,2022-12-15T02:21:14.891+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[90]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f36f009d4c0>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

budget,company_id,movie_key,movie_status,popularity,release_date,revenue,runtime,title,vote_average,vote_count,_rescued_data,receipt_time,source_file
94000000,3,12,Released,85.688789,2003-05-30,940335536,100,Finding Nemo,7.6,6122,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
55000000,4,13,Released,138.133331,1994-07-06,677945399,142,Forrest Gump,8.2,7927,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,10214,68,Released,41.089863,1985-02-20,0,132,Brazil,7.5,861,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
70000000,8601,75,Released,44.090535,1996-12-12,101371017,106,Mars Attacks!,6.1,1509,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
2500000,98,76,Released,23.672571,1995-01-27,5535405,105,Before Sunrise,7.7,959,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
130000,22376,83,Released,15.611857,2004-08-06,54667954,79,Open Water,5.4,315,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
25000000,33,111,Released,70.105981,1983-12-08,65884703,170,Scarface,8.0,2948,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,7,116,Released,30.669913,2005-10-26,85306374,124,Match Point,7.3,1105,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
25000000,4,117,Released,38.272889,1987-06-02,76270454,119,The Untouchables,7.6,1384,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json
15000000,10342,129,Released,118.968562,2001-07-20,274925095,125,千と千尋の神隠し,8.3,3840,,2022-12-15T02:22:15.715+0000,dbfs:/FileStore/ds2002-capstone/source_data/stream/fact_orders_1.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
budget,bigint,
company_id,bigint,
movie_key,bigint,
movie_status,string,
popularity,double,
release_date,string,
revenue,bigint,
runtime,bigint,
title,string,
vote_average,double,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.movie_key
    , t.budget
    , t.company_id
    , t.movie_status
    , t.title
    , t.popularity
    , d.month_name AS release_month
    , d.day_name_of_week AS release_day_name
    , d.day_of_month AS release_day
    , d.calendar_year AS release_year
    , t.revenue
    , t.runtime
  FROM orders_silver_tempview t
  INNER JOIN movies.dim_movie m
  ON t.movie_key = m.movie_id
  INNER JOIN movies.dim_movie_company mc
  ON t.company_id = mc.movie_company_key
  INNER JOIN movies.dim_production_company p
  ON t.company_id = p.production_company_key
  INNER JOIN movies.dim_date d
  ON t.release_date = d.full_date)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[107]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f36efa291c0>

In [0]:
%sql
SELECT * FROM fact_orders_silver

movie_key,budget,company_id,movie_status,title,popularity,release_month,release_day_name,release_day,release_year,revenue,runtime
245,27000000,33,Released,About a Boy,28.227984,April,Friday,26,2002,129000000,101
462,52000000,216,Released,Erin Brockovich,30.347332,March,Friday,17,2000,256271286,131


In [0]:
%sql
DESCRIBE EXTENDED movies.fact_orders_silver

col_name,data_type,comment
movie_key,bigint,
budget,bigint,
company_id,bigint,
movie_status,string,
title,string,
popularity,double,
release_month,string,
release_day_name,string,
release_day,int,
release_year,int,


##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT company_id, AVG(revenue) as average_revenue
FROM movies.fact_orders_silver
GROUP BY company_id;

company_id,average_revenue
33,129000000.0
216,256271286.0


In [0]:
%sql
SELECT company_id, SUM(revenue) as revenue_sum
FROM movies.fact_orders_silver
GROUP BY company_id;

company_id,revenue_sum
33,129000000
216,256271286
