In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as fc
from pyspark.sql.functions import column as col,udf
import pyspark.sql.types as tp
from datetime import datetime as dt
import kaggle



In [2]:
spark = SparkSession.builder\
.config("spark.jars.packages","com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.4")\
.config("spark.hadoop.orc.overwrite.output.file","true")\
.getOrCreate()
spark

# Netflix Catalog

### Load catalog data and get ride of malformed rows

In [4]:
kaggle.api.dataset_download_files(dataset="shivamb/netflix-shows")

In [3]:
netflix_catalog_path='/Users/brayanjules/Projects/personal/data engineer/datasets/raw_netflix_catalog'
catalog = spark.read.csv(netflix_catalog_path,inferSchema=True, header=True,mode="DROPMALFORMED")

In [4]:
catalog.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [263]:
catalog_ordered=catalog.orderBy(fc.desc('title'))
catalog_ordered.limit(3).toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,81011096,Movie,최강전사 미니특공대 : 영웅의 탄생,Young Jun Lee,"Um Sang-hyun, Yang Jeong-hwa, Jeon Tae-yeol, S...",,"September 1, 2018",2018,TV-Y7-FV,68 min,Children & Family Movies,"Miniforce, a special task force of elite range..."
1,80226357,Movie,반드시 잡는다,Hong-seon Kim,Baek Yoon-sik,South Korea,"February 28, 2018",2017,TV-MA,110 min,"Dramas, International Movies, Thrillers",After people in his town start turning up dead...
2,80226338,TV Show,마녀사냥,,"Si-kyung Sung, Se-yoon Yoo, Dong-yup Shin, Ji-...",South Korea,"February 19, 2018",2015,TV-MA,1 Season,"International TV Shows, Korean TV Shows, Stand...",Four Korean celebrity men and guest stars of b...


## Netflix Catalog Data Cleaning

### Deduplication of the data

In [5]:
duplicatedContent=catalog.groupBy(['title']).count().orderBy(fc.desc('count'))
duplicatedContent.limit(5).toPandas()

Unnamed: 0,title,count
0,Oh My Ghost,3
1,Tunnel,3
2,The Silence,3
3,Love,3
4,Limitless,3


#### Understanding duplication

In [6]:
catalog.where(catalog.title=='The Silence').toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,80244078,Movie,The Silence,Gajendra Ahire,"Raghuvir Yadav, Nagraj Manjule, Anjali Patil, ...",India,"March 1, 2018",2017,TV-MA,90 min,"Dramas, International Movies","On a train in Mumbai, 20-something Chini witne..."
1,80238292,Movie,The Silence,Gajendra Ahire,"Raghuvir Yadav, Nagraj Manjule, Anjali Patil, ...",India,"March 1, 2018",2015,TV-MA,91 min,"Dramas, International Movies","After encountering a scene of sexual violence,..."
2,81021447,Movie,The Silence,John R. Leonetti,"Stanley Tucci, Kiernan Shipka, Miranda Otto, K...",Germany,"April 10, 2019",2019,TV-14,91 min,"Horror Movies, Thrillers",With the world under attack by deadly creature...


In [7]:
catalog.where(catalog.title=='The Silence').dropDuplicates(['title','director']).toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,81021447,Movie,The Silence,John R. Leonetti,"Stanley Tucci, Kiernan Shipka, Miranda Otto, K...",Germany,"April 10, 2019",2019,TV-14,91 min,"Horror Movies, Thrillers",With the world under attack by deadly creature...
1,80244078,Movie,The Silence,Gajendra Ahire,"Raghuvir Yadav, Nagraj Manjule, Anjali Patil, ...",India,"March 1, 2018",2017,TV-MA,90 min,"Dramas, International Movies","On a train in Mumbai, 20-something Chini witne..."


#### Drop duplicated rows( those are the one that have the same title and director)

In [10]:
non_duplicated_content=catalog.dropDuplicates(['title','director']).orderBy(fc.desc('title'))
non_duplicated_content.groupBy(['title','director']).count().orderBy(fc.desc('count')).limit(3).toPandas()

Unnamed: 0,title,director,count
0,You're My Boss,Antoinette Jadaone,1
1,You're Everything To Me,Tolga Örnek,1
2,You Get Me,Brent Bonacorso,1


### Understand if there is movies without description

In [14]:
no_valid_content=catalog.where((catalog.title).isNotNull() & (catalog.description).isNull())
no_valid_content.limit(5).toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description


### Drop rows without title or director.

#### Verify if there are rows without title or director

In [53]:
non_duplicated_content.select('title','type','country','cast').orderBy(fc.asc('title')).limit(5).toPandas()

Unnamed: 0,title,type,country,cast
0,,William Wyler,1944,"March 31, 2017"
1,,,,
2,"""Behind """"The Cove"""": The Quiet Japanese Speak...",Movie,"Japan, United States",
3,"""Escape from the """"Liberty"""" Cinema""",Movie,Poland,"Janusz Gajos, Zbigniew Zamachowski, Teresa Mar..."
4,"""Gabriel """"Fluffy"""" Iglesias: One Show Fits All""",Movie,,Gabriel Iglesias


In [54]:
non_duplicated_content \
.where((non_duplicated_content.title.isNull()) | (non_duplicated_content.director.isNull())) \
.limit(3).toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,80226338,TV Show,마녀사냥,,"Si-kyung Sung, Se-yoon Yoo, Dong-yup Shin, Ji-...",South Korea,"February 19, 2018",2015,TV-MA,1 Season,"International TV Shows, Korean TV Shows, Stand...",Four Korean celebrity men and guest stars of b...
1,80136789,TV Show,海的儿子,,"Li Nanxing, Christopher Lee, Jesseca Liu, Appl...",,"April 27, 2018",2016,TV-PG,1 Season,"International TV Shows, TV Dramas","Two brothers start a new life in Singapore, wh..."
2,80990464,TV Show,忍者ハットリくん,,,Japan,"December 23, 2018",2012,TV-Y7,2 Seasons,"Anime Series, Kids' TV","Hailing from the mountains of Iga, Kanzo Hatto..."


In [55]:
content_with_title=non_duplicated_content.dropna('any',subset=['title','director']).orderBy(fc.asc('title'))
content_with_title=content_with_title.withColumn('title',fc.translate('title','"',''))
content_with_title.limit(3).toPandas()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,80132127,Movie,Behind The Cove: The Quiet Japanese Speak Out,Keiko Yagi,,"Japan, United States","August 25, 2017",2015,TV-14,105 min,"Documentaries, International Movies",After a documentary about the Japanese whaling...
1,81168345,Movie,Escape from the Liberty Cinema,Wojciech Marczewski,"Janusz Gajos, Zbigniew Zamachowski, Teresa Mar...",Poland,"October 1, 2019",1990,TV-MA,88 min,"Comedies, Dramas, Independent Movies",Artistic rebellion ignites at the movies when ...
2,81087095,Movie,#Roxy,Michael Kennedy,"Jake Short, Sarah Fisher, Booboo Stewart, Dann...",Canada,"April 10, 2019",2018,TV-14,105 min,"Comedies, Romantic Movies",A teenage hacker with a huge nose helps a cool...


### Fix of column types

In [58]:
content_with_title=content_with_title.withColumn('show_id',col('show_id').cast(tp.LongType()))
content_with_title=content_with_title.withColumn('release_year',col('release_year').cast(tp.IntegerType()))
content_with_title=content_with_title.withColumn('date_added',fc.to_date('date_added','MMMMM dd, yyyy'))

In [59]:
topDirector=content_with_title.dropna('any',subset=['director']).where((catalog.type=='Movie')).groupBy('director').count().orderBy(fc.desc('count'))
topDirector.limit(3).toPandas()

Unnamed: 0,director,count
0,"Raúl Campos, Jan Suter",18
1,Marcus Raboy,14
2,Jay Karas,13


In [60]:
content_with_title.printSchema()

root
 |-- show_id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: date (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [273]:
content_with_title.count()

6172

In [23]:
content_with_title.write.parquet('/Users/brayanjules/Projects/personal/data engineer/datasets/netflix_catalog/catalog.parquet','overwrite')

# Reddit Netflix comments

In [3]:
import praw
import os
import pandas as pd
from pyspark.sql import Row

In [4]:
os.getcwd()

'/Users/brayanjules/Projects/personal/data engineer/nanodegre/capstone-project/notebooks'

In [5]:
reddit = praw.Reddit(client_id='k6twSlTNOdnGjQ',client_secret='NrQ-rdSKMOJM17yj3hO4apbmTis'
                     ,
                     user_agent='academic_comments_understanding:v1 by /u/zekeja') ## Use the praw.init when possible

### SubReddit Search by content ( netflix,NetflixBestOf,bestofnetflix)

In [68]:
redditSchema=tp.StructType([tp.StructField('show_title',tp.StringType(),True),
                            tp.StructField('show_director',tp.StringType(),True),
               tp.StructField('submission_id',tp.StringType(),True),
               tp.StructField('source',tp.StringType(),True),
               tp.StructField('title',tp.StringType(),True),
               tp.StructField('description',tp.StringType(),True),
               tp.StructField('created_utc',tp.TimestampType(),True),
               tp.StructField('author',tp.StringType(),True),
               tp.StructField('score',tp.IntegerType(),True),
               tp.StructField('spoiler',tp.BooleanType(),True),
               tp.StructField('is_original_content',tp.BooleanType(),True),
               tp.StructField('distinguished',tp.StringType(),True),
               tp.StructField('link',tp.StringType(),True),             
               tp.StructField('comments',tp.ArrayType(tp.StructType([
                   tp.StructField('comment_id',tp.StringType(),True),
                   tp.StructField('body',tp.StringType(),True),
                   tp.StructField('created_utc',tp.TimestampType(),True),
                   tp.StructField('score',tp.IntegerType(),True),
                   tp.StructField('parent_id',tp.StringType(),True),
                   tp.StructField('submission_id',tp.StringType(),True)]
               )),True)
              ])


### Random Generate 1000.0000 of comments for testing propouse

In [66]:
import random
import string
import datetime as dat
def randomString(stringLength=8):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))

def random_date(start, end):
    """
    This function will return a random datetime between two datetime
    objects.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = random.randrange(int_delta)
    return start + dat.timedelta(seconds=random_second)

In [67]:
content_rows=[]
index = 1
for content in content_with_title.collect():
    title_split=content.title.split(":",1)
    content_title=title_split[0]
    row_comments = []
    cur_date = dat.datetime.now()
    score = random.randint(0,100)
    for source in ['netflix','tvshows']:
        for index in range(0,250):
            comment = randomString(random.randint(8,50))

            created_date = random_date(cur_date - dat.timedelta(days=120),cur_date)
            row_comments.append((str(index),comment,created_date,score,
                                       'default_parent','default_link')) 
        current_sm=(content.title,content.director,index,source,content.title,'selftext',
                    cur_date,'default_author',
                      score,False,False,'','',row_comments)
        content_rows.append(current_sm) 
        index+=1

In [25]:
content_rows=[]
for content in content_with_title.limit(20).collect():
    title_split=content.title.split(":",1)
    content_title=title_split[0]
    subreddit=reddit.subreddit('netflix')
    for sm in subreddit.search('"'+content_title+'"',sort='new'):
        sm.comments.replace_more(limit=None)
        #print(sm.title)
        row_comments = []
        for comment in sm.comments.list():
            row_comments.append((comment.id,comment.body,dt.fromtimestamp(float(comment.created_utc)),comment.score,
                                       comment.parent_id,comment.link_id)) 
        current_sm=(content.title,content.director,sm.id,subreddit.display_name,sm.title,sm.selftext,
                    dt.fromtimestamp(float(sm.created_utc)),sm.author.name,
                      sm.score,sm.spoiler,sm.is_original_content,sm.distinguished,sm.permalink,row_comments)
        content_rows.append(current_sm)    

In [69]:
rdt_netflix_content=spark.createDataFrame(content_rows,redditSchema)
rdt_netflix_content.printSchema()

root
 |-- show_title: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- author: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- spoiler: boolean (nullable = true)
 |-- is_original_content: boolean (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- link: string (nullable = true)
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comment_id: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_utc: timestamp (nullable = true)
 |    |    |-- score: integer (nullable = true)
 |    |    |-- parent_id: string (nullable = true)
 |    |    |-- submission_id: string (nullable = true)



In [70]:
rdt_netflix_content.orderBy(fc.desc('created_utc')).limit(3).toPandas()

Unnamed: 0,show_title,submission_id,source,title,description,created_utc,author,score,spoiler,is_original_content,distinguished,link,comments
0,최강전사 미니특공대 : 영웅의 탄생,249,tvshows,최강전사 미니특공대 : 영웅의 탄생,selftext,2020-05-02 19:38:06.516381,default_author,98,False,False,,,"[(0, vmlmxgfynljprvejsncqbmcyetzqghckhgwexb, 2..."
1,최강전사 미니특공대 : 영웅의 탄생,249,netflix,최강전사 미니특공대 : 영웅의 탄생,selftext,2020-05-02 19:38:06.516381,default_author,98,False,False,,,"[(0, vmlmxgfynljprvejsncqbmcyetzqghckhgwexb, 2..."
2,반드시 잡는다,249,netflix,반드시 잡는다,selftext,2020-05-02 19:38:06.498860,default_author,97,False,False,,,"[(0, oegpqhqeywpruksxlhg, 2020-01-23 17:18:56...."


In [28]:
rdt_netflix_content.limit(5).toPandas()

Unnamed: 0,show_title,submission_id,source,title,description,created_utc,author,score,spoiler,is_original_content,distinguished,link,comments
0,Behind The Cove: The Quiet Japanese Speak Out,98y3ld,netflix,"[UK] Netflix no longer has The Cove, the Oscar...",I appreciate having both sides of the argument...,2018-08-20 19:22:31,lewis_pritchard,141,False,False,,/r/netflix/comments/98y3ld/uk_netflix_no_longe...,"[(e4jp9zf, It's highly unlikely they know the ..."
1,#Roxy,g5uizp,netflix,Reciving Error M7111-5059 but I don't use any ...,Just today I have been unable to view any con...,2020-04-22 00:20:45,Atromix_,2,False,False,,/r/netflix/comments/g5uizp/reciving_error_m711...,"[(fo5muf4, Your ISP is probably using a shared..."
2,#Selfie,e8v216,netflix,Weird shirtless gym selfie on Netflix Twitter ...,[https://twitter.com/netflix/status/120445343...,2019-12-10 16:07:43,OrganicCorndawg,1,False,False,,/r/netflix/comments/e8v216/weird_shirtless_gym...,"[(faeoqgb, This response seems to explain it:\..."
3,#Selfie,99vbkh,netflix,Selfie From Hell Movie: Ending Explained + Wha...,,2018-08-24 04:14:30,PaulTweddle,5,False,False,,/r/netflix/comments/99vbkh/selfie_from_hell_mo...,[]
4,#realityhigh,8z4t4l,netflix,what the FUCK,was at a party right now (we all bet on croati...,2018-07-15 16:36:44,dickensian_nightmare,0,False,False,,/r/netflix/comments/8z4t4l/what_the_fuck/,"[(e2g8bs1, I’ll Give you an upvote because of ..."


In [275]:
content_with_title.select('title').groupBy('title') \
.agg(fc.count('title').alias('qty')).filter(fc.col('qty')>1).toPandas()

Unnamed: 0,title,qty


In [279]:
rdt_netflix_content.select('show_title').groupBy('show_title') \
.agg(fc.count('show_title').alias('qty')).filter(fc.col('qty')>1).orderBy(fc.desc('qty')).toPandas()

Unnamed: 0,show_title,qty


In [71]:
rdt_netflix_content.write.parquet(path="/Users/brayanjules/Projects/personal/data engineer/datasets/reddit_netflix/comments.parquet",mode='overwrite')

In [65]:
result_test=content_with_title.limit(5).rdd.flatMap(lambda x:getRedditComments(reddit,x,'netflix',redditSchema))
final_result=spark.createDataFrame(result_test,redditSchema)

In [21]:
def test_x(x):
    return x

In [66]:
final_result.toPandas()

Unnamed: 0,show_id,submission_id,source,title,description,created_utc,author,score,spoiler,is_original_content,distinguished,link,comments
0,80132127,98y3ld,netflix,"[UK] Netflix no longer has The Cove, the Oscar...",I appreciate having both sides of the argument...,2018-08-20 19:22:31,lewis_pritchard,141,False,False,,/r/netflix/comments/98y3ld/uk_netflix_no_longe...,"[(e4jp9zf, It's highly unlikely they know the ..."
1,81087095,g5uizp,netflix,Reciving Error M7111-5059 but I don't use any ...,Just today I have been unable to view any con...,2020-04-22 00:20:45,Atromix_,2,False,False,,/r/netflix/comments/g5uizp/reciving_error_m711...,"[(fo5muf4, Your ISP is probably using a shared..."
2,81092768,99vbkh,netflix,Selfie From Hell Movie: Ending Explained + Wha...,,2018-08-24 04:14:30,PaulTweddle,4,False,False,,/r/netflix/comments/99vbkh/selfie_from_hell_mo...,[]
3,81092768,e8v216,netflix,Weird shirtless gym selfie on Netflix Twitter ...,[https://twitter.com/netflix/status/120445343...,2019-12-10 16:07:43,OrganicCorndawg,1,False,False,,/r/netflix/comments/e8v216/weird_shirtless_gym...,"[(faeoqgb, This response seems to explain it:\..."


In [17]:
stored_rows = SparkContext.accumulator(0,[],[])


In [18]:
def add_new_acc_value(rows):
    global stored_rows
    stored_rows.value.append(rows)

In [19]:
stored_rows.value


[]

In [None]:
#@udf(redditSchema)
def getRedditComments(reddit, show, sub_reddit, reddit_schema):
    content_rows = []
    title_split = show.title.split(":", 1)
    content_title = title_split[0]
    subreddit = reddit.subreddit(sub_reddit)
    for sm in subreddit.search('"' + content_title + '"', sort='top'):
        sm.comments.replace_more(limit=None)
        row_comments = []
        for comment in sm.comments.list():
            #row_comments.append(1)
            row_comments.append(
                (comment.id, comment.body, dt.fromtimestamp(float(comment.created_utc)), comment.score,
                 comment.parent_id, comment.link_id))
        current_sm = (show.show_id, sm.id, subreddit.display_name, sm.title, sm.selftext,
                      dt.fromtimestamp(float(sm.created_utc)), sm.author.name,
                      sm.score, sm.spoiler, sm.is_original_content, sm.distinguished, sm.permalink, row_comments)
        content_rows.append(current_sm)
        # self.log.info('Data extracted from subreddit: {}'.format(sub_reddit))
    return content_rows


In [74]:
a = "netflix  NetflixBestOf"
a.split()

['netflix', 'NetflixBestOf']

In [55]:
#@udf(redditSchema)
def getRedditComments(reddit, show, sub_reddit, reddit_schema):
    content_rows = []
    title_split = show.title.split(":", 1)
    content_title = title_split[0]
    subreddit = reddit.subreddit(sub_reddit)
    for sm in subreddit.search('"' + content_title + '"', sort='top'):
        sm.comments.replace_more(limit=None)
        row_comments = []
        for comment in sm.comments.list():
            #row_comments.append(1)
            row_comments.append(
                (comment.id, comment.body, dt.fromtimestamp(float(comment.created_utc)), comment.score,
                 comment.parent_id, comment.link_id))
        current_sm = (show.show_id, sm.id, subreddit.display_name, sm.title, sm.selftext,
                      dt.fromtimestamp(float(sm.created_utc)), sm.author.name,
                      sm.score, sm.spoiler, sm.is_original_content, sm.distinguished, sm.permalink, row_comments)
        content_rows.append(current_sm)
        # self.log.info('Data extracted from subreddit: {}'.format(sub_reddit))
    return content_rows


In [87]:
catalog_df = spark.read.parquet('/Users/brayanjules/Projects/personal/data engineer/datasets/netflix_catalog/catalog.parquet',)
comments_df = spark.read.parquet('/Users/brayanjules/Projects/personal/data engineer/datasets/reddit_netflix/comments.parquet')
        

In [88]:
unnest_catalog = catalog_df.select(catalog_df.date_added, catalog_df.release_year, catalog_df.title,
                                           catalog_df.type, catalog_df.duration, catalog_df.description,
                                           catalog_df.director,
                                           fc.explode(fc.split(catalog_df.cast, ',')).alias('actor'))

In [89]:
unnest_catalog.printSchema()

root
 |-- date_added: date (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- actor: string (nullable = true)



In [98]:
unnest_comments = comments_df.where((comments_df.show_title == 'Arthur Christmas') & (comments_df.author == 'default_author')).withColumn('comments',fc.explode(comments_df.comments).alias('comments')).select(comments_df.show_title, comments_df.source, comments_df.title,
                                     comments_df.description, comments_df.author,
                                    'comments.*')        

In [99]:
unnest_comments.printSchema()

root
 |-- show_title: string (nullable = true)
 |-- source: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- author: string (nullable = true)
 |-- comment_id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- score: integer (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- submission_id: string (nullable = true)



In [100]:
spark.catalog.dropTempView("catalog_temp")
unnest_catalog.createTempView('catalog_temp')
spark.catalog.dropTempView("comments_temp")
unnest_comments.createTempView('comments_temp')

In [119]:
structured_result = spark.sql(
            'select ct.date_added,ct.release_year,ct.title,ct.type,ct.duration,ct.description,ct.director, '
            'COLLECT_LIST(STRUCT(co.body as body,co.author,co.created_utc as created, co.score,null as sentiment, '
            'null as description_word, null as source)) as comments, '
            'COLLECT_LIST(STRUCT(ct.actor as name)) as actor '
            ' from catalog_temp ct inner join comments_temp co on ct.title=co.show_title '
            ' group by ct.date_added,ct.release_year,ct.title,ct.type,ct.duration,ct.description,ct.director')

In [107]:
structured_result.select(fc.explode('comments')).limit(5).collect()

[Row(col=Row(body='jdjmzfvxgdthheyoxdjjmtofrefvotwnfayj', author='default_author', created=datetime.datetime(2020, 3, 28, 16, 54, 20, 790367), score=74, sentiment=None, description_word=None, source=None)),
 Row(col=Row(body='jdjmzfvxgdthheyoxdjjmtofrefvotwnfayj', author='default_author', created=datetime.datetime(2020, 3, 28, 16, 54, 20, 790367), score=74, sentiment=None, description_word=None, source=None)),
 Row(col=Row(body='jdjmzfvxgdthheyoxdjjmtofrefvotwnfayj', author='default_author', created=datetime.datetime(2020, 3, 28, 16, 54, 20, 790367), score=74, sentiment=None, description_word=None, source=None)),
 Row(col=Row(body='jdjmzfvxgdthheyoxdjjmtofrefvotwnfayj', author='default_author', created=datetime.datetime(2020, 3, 28, 16, 54, 20, 790367), score=74, sentiment=None, description_word=None, source=None)),
 Row(col=Row(body='jdjmzfvxgdthheyoxdjjmtofrefvotwnfayj', author='default_author', created=datetime.datetime(2020, 3, 28, 16, 54, 20, 790367), score=74, sentiment=None, des

In [173]:
structured_result.printSchema()

root
 |-- date_added: date (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- created: timestamp (nullable = true)
 |    |    |-- score: integer (nullable = true)
 |    |    |-- sentiment: null (nullable = true)
 |    |    |-- description_word: null (nullable = true)
 |    |    |-- source: null (nullable = true)
 |-- actor: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)



In [124]:
    def get_df_schema():
        return tp.StructType([tp.StructField('added_date', tp.DateType(), False),
                              tp.StructField('release_year', tp.IntegerType(), True),
                              tp.StructField('title', tp.StringType(), False),
                              tp.StructField('type', tp.StringType(), False),
                              tp.StructField('duration', tp.StringType(), True),
                              tp.StructField('description', tp.StringType(), True),
                              tp.StructField('director', tp.StringType(), True),
                              tp.StructField('comments', tp.ArrayType(tp.StructType([
                                  tp.StructField('body', tp.StringType(), False),
                                  tp.StructField('author', tp.StringType(), False),
                                  tp.StructField('created_utc', tp.TimestampType(), False),
                                  tp.StructField('score', tp.IntegerType(), False),
                                  tp.StructField('sentiment', tp.StringType(), True),
                                  tp.StructField('description_word', tp.StringType(), True),
                                  tp.StructField('source', tp.StringType(), True)
                              ])), False),
                              tp.StructField('actors', tp.ArrayType(tp.StructType([
                                  tp.StructField('name', tp.StringType(), False)
                              ])), True)
                              ])

In [125]:
get_df_schema()

StructType(List(StructField(added_date,DateType,false),StructField(release_year,IntegerType,true),StructField(title,StringType,false),StructField(type,StringType,false),StructField(duration,StringType,true),StructField(description,StringType,true),StructField(director,StringType,true),StructField(comments,ArrayType(StructType(List(StructField(body,StringType,false),StructField(author,StringType,false),StructField(created_utc,TimestampType,false),StructField(score,IntegerType,false),StructField(sentiment,StringType,true),StructField(description_word,StringType,true),StructField(source,StringType,true))),true),false),StructField(actors,ArrayType(StructType(List(StructField(name,StringType,false))),true),true)))

In [112]:
content_review_structured = spark.createDataFrame(structured_result.rdd, get_df_schema())

In [113]:
content_review_structured.printSchema()

root
 |-- added_date: date (nullable = false)
 |-- release_year: integer (nullable = true)
 |-- title: string (nullable = false)
 |-- type: string (nullable = false)
 |-- duration: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- comments: struct (nullable = false)
 |    |-- body: string (nullable = false)
 |    |-- author: string (nullable = false)
 |    |-- created_utc: timestamp (nullable = false)
 |    |-- score: integer (nullable = false)
 |    |-- sentiment: string (nullable = true)
 |    |-- description_word: string (nullable = true)
 |    |-- source: string (nullable = true)
 |-- actors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = false)



In [115]:
content_review_structured.select('comments').toPandas()

Py4JJavaError: An error occurred while calling o1140.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 223.0 failed 1 times, most recent failure: Lost task 26.0 in stage 223.0 (TID 14477, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1370, in verify_struct
    verifier(v)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1368, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: field comments: Length of object (44000) does not match with length of fields (7)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1370, in verify_struct
    verifier(v)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py", line 1368, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: field comments: Length of object (44000) does not match with length of fields (7)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
