In [2]:
#import necessary functions from pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, col, to_date, date_format
from pyspark.sql.types import IntegerType
#open a pyspark session and read in the csv data
spark = SparkSession.builder.appName('cleaned_films').getOrCreate()
df = spark.read.options(header='True', inferSchema='True').csv('netflix_titles.csv')

In [88]:
#apply necessary transformations for the netflix file
filtered = df.filter(df.type.contains('Movie'))\
    .withColumn('streaming_service', lit('Netflix'))\
    .withColumn('adult', when(col('rating') == 'R', True).otherwise(False))\
    .withColumn('american', when((col('country').like('%United States%')), True).otherwise(False))\
    .withColumn('airing_date', date_format(to_date(col('date_added'), 'MMMM d, yyyy'),'yyyy-MM-dd'))\
    .withColumn('runtime', when(split(col('duration'),' ').getItem(0).cast(IntegerType()) < 60, '< 60 mins')\
                           .when(split(col('duration'),' ').getItem(0).cast(IntegerType()) > 120, '> 2 hrs')
                           .otherwise('1-2 hrs'))\
    .drop('show_id','type','cast','date_added','release_year','rating', 'listed_in', 'description')

In [89]:
#output to parquet file
filtered.write.parquet('cleaned_films')

In [92]:
#import necessary functions from pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, col, to_date, date_format
from pyspark.sql.types import IntegerType
#open a pyspark session and read in the csv data
spark = SparkSession.builder.appName('cleaned_films').getOrCreate()
df1 = spark.read.options(header='True', inferSchema='True').csv('amazon_prime_titles.csv')

In [97]:
#apply necessary transformations for the amazon prime file
filtered1 = df1.filter(df1.type.contains('Movie'))\
    .withColumn('streaming_service', lit('Amazon Prime'))\
    .withColumn('adult', when((col('rating') == 'R') | (col('rating') == '18+'), True).otherwise(False))\
    .withColumn('american', when((col('country').like('%United States%')), True).otherwise(False))\
    .withColumn('airing_date', date_format(to_date(col('date_added'), 'MMMM d, yyyy'),'yyyy-MM-dd'))\
    .withColumn('runtime', when(split(col('duration'),' ').getItem(0).cast(IntegerType()) < 60, '< 60 mins')\
                           .when(split(col('duration'),' ').getItem(0).cast(IntegerType()) > 120, '> 2 hrs')
                           .otherwise('1-2 hrs'))\
    .drop('show_id','type','cast','date_added','release_year','rating', 'listed_in', 'description')

In [98]:
#append the existing cleaned_films file
filtered1.write.mode('append').parquet('cleaned_films')

In [100]:
#import necessary functions from pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, col, to_date, date_format
from pyspark.sql.types import IntegerType
#open a pyspark session and read in the csv data
spark = SparkSession.builder.appName('cleaned_films').getOrCreate()
df2 = spark.read.options(header='True', inferSchema='True').csv('disney_plus_titles.csv')

In [103]:
#apply necessary transformations for the disney plus file
filtered2 = df2.filter(df2.type.contains('Movie'))\
    .withColumn('streaming_service', lit('Disney Plus'))\
    .withColumn('adult', when((col('rating') == 'R') | (col('rating') == '18+'), True).otherwise(False))\
    .withColumn('american', when((col('country').like('%United States%')), True).otherwise(False))\
    .withColumn('airing_date', date_format(to_date(col('date_added'), 'MMMM d, yyyy'),'yyyy-MM-dd'))\
    .withColumn('runtime', when(split(col('duration'),' ').getItem(0).cast(IntegerType()) < 60, '< 60 mins')\
                           .when(split(col('duration'),' ').getItem(0).cast(IntegerType()) > 120, '> 2 hrs')
                           .otherwise('1-2 hrs'))\
    .drop('show_id','type','cast','date_added','release_year','rating', 'listed_in', 'description')

In [106]:
#append the existing cleaned_films file
filtered2.write.mode('append').parquet('cleaned_films')

In [5]:
#reads the parquet file and performs basic checks to see that appending worked as expected
parqdf = spark.read.parquet('cleaned_films')
parqdf.head(5)

[Row(title='The Grand Seduction', director='Don McKellar', country='Canada', duration='113 min', streaming_service='Amazon Prime', adult=False, american=False, airing_date='2021-03-30', runtime='1-2 hrs'),
 Row(title='Take Care Good Night', director='Girish Joshi', country='India', duration='110 min', streaming_service='Amazon Prime', adult=False, american=False, airing_date='2021-03-30', runtime='1-2 hrs'),
 Row(title='Secrets of Deception', director='Josh Webber', country='United States', duration='74 min', streaming_service='Amazon Prime', adult=False, american=True, airing_date='2021-03-30', runtime='1-2 hrs'),
 Row(title='Pink: Staying True', director='Sonia Anderson', country='United States', duration='69 min', streaming_service='Amazon Prime', adult=False, american=True, airing_date='2021-03-30', runtime='1-2 hrs'),
 Row(title='Monster Maker', director='Giles Foster', country='United Kingdom', duration='45 min', streaming_service='Amazon Prime', adult=False, american=False, airi

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cleaned_films').getOrCreate()
import great_expectations as ge
df2 = spark.read.options(header='True', inferSchema='True').csv('disney_plus_titles.csv')

In [23]:
#converts the dataset to a great expectations object
df_ge = ge.dataset.SparkDFDataset(df2)
#checks that there are type and title columns
print(df_ge.expect_column_to_exist('type','title'))

{
  "result": {},
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "expectation_config": {
    "meta": {},
    "kwargs": {
      "column": "type",
      "result_format": "BASIC"
    },
    "expectation_type": "expect_column_to_exist"
  }
}


22/01/13 15:10:12 WARN CacheManager: Asked to cache already cached data.


In [19]:
#checks that the minimum amount of data is present in the table
print(df_ge.expect_table_column_count_to_be_between(5,12))

{
  "result": {
    "observed_value": 12
  },
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "expectation_config": {
    "meta": {},
    "kwargs": {
      "min_value": 5,
      "max_value": 12,
      "result_format": "BASIC"
    },
    "expectation_type": "expect_table_column_count_to_be_between"
  }
}


In [27]:
#checks that the values in the type column are as expected
print(df_ge.expect_column_values_to_be_in_set('type',['Movie', 'TV Show']))

{
  "result": {
    "element_count": 1450,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "expectation_config": {
    "meta": {},
    "kwargs": {
      "column": "type",
      "value_set": [
        "Movie",
        "TV Show"
      ],
      "result_format": "BASIC"
    },
    "expectation_type": "expect_column_values_to_be_in_set"
  }
}
