# Project Group 15

## Summary
- Group 15 (Aydin Schwartz, Deepak Singh, Tianrui Hu, Vichitra Kumar)

- Data : NY times API Data (https://developer.nytimes.com/docs/archive-product/1/overview)

- Cluster Setting : Multi Node, No Isolation Shared, Runtime : 11.0, Worker/Driver type : e2-highmem-2 (Min workers : 1, Max workers : 2 with autoscaling)

- To improve efficiency, configure to have spark.executor.memory and spark.driver.memory being 4g and .cache() intermediate rdd which being used often.

- Objectives : Using New York Times News Data from 1991 to 2021 from various locations, topics and organizations, see the search trends and plot using line/scatter/bubble plots.

> 1) Visualize trends of overall news articles by locations/topics/organizations using line and/or scatter plots

> 2) Visualize keywords / sections most talked about in the news articles in past 30 years

In [0]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import pyspark

from datetime import datetime as dt



In [0]:
sc._conf.getAll()

Out[2]: [('spark.databricks.preemption.enabled', 'true'),
 ('spark.databricks.clusterUsageTags.clusterId', '1213-212447-k68mp9tw'),
 ('spark.sql.hive.metastore.jars', '/databricks/databricks-hive/*'),
 ('spark.driver.tempDirectory', '/local_disk0/tmp'),
 ('spark.databricks.clusterUsageTags.onInstancePool', 'false'),
 ('spark.sql.warehouse.dir', 'dbfs:/user/hive/warehouse'),
 ('spark.databricks.managedCatalog.clientClassName',
  'com.databricks.managedcatalog.ManagedCatalogClientImpl'),
 ('spark.hadoop.fs.gs.impl',
  'shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem'),
 ('spark.databricks.clusterUsageTags.clusterTargetWorkers', '1'),
 ('spark.hadoop.fs.fcfs-s3.impl.disable.cache', 'true'),
 ('spark.hadoop.fs.s3a.retry.limit', '20'),
 ('spark.sql.streaming.checkpointFileManagerClass',
  'com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager'),
 ('spark.databricks.service.dbutils.repl.backend',
  'com.databricks.dbconnect.ReplDBUtils'),
 ('spark.hadoop.

In [0]:
sc._conf.get('spark.executor.memory'), sc._conf.get('spark.driver.memory')

Out[3]: ('4g', '4g')

In [0]:
file_path = 'gs://g15-bucket/nyt_apidata'

In [0]:
df = spark.read.format("com.databricks.spark.csv")\
    .option('delimiter',',').option('header',True)\
    .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ssx")\
    .option("quote", '\"').option("escape", "\"").option("multiline", True).option("inferSchema", True).csv(file_path)

In [0]:
rdd = df.rdd.map(list)
rdd.count()

Out[7]: 2894493

## Data Preparation

In [0]:
dw_mapping={0: 'Monday', 1: 'Tuesday', 2: 'Wednesday',3: 'Thursday',4: 'Friday',5: 'Saturday', 6: 'Sunday'}

In [0]:
#x[9] = 'section_name', x[5] = 'keywords', x[6] = 'pub_date'
rdd2 = rdd.map(lambda x:[x[9], x[5], x[6], str(x[6]).replace(' ','-').replace('T', '-').replace(':', '-').replace('+', '-').split('-')])\
                .filter(lambda x: len(x[3])==7 and len(x[2])==24 and len(x[3][0])==4)\
                .map(lambda x: [dt.fromisoformat(x[2].split('+')[0]).date(), x[0], eval(x[1])])\
                .map(lambda x: [x[0], x[0].year, x[0].month, x[0].day, dw_mapping[x[0].weekday()], x[1], 
                                None if len(x[2])==0 else x[2][0]['name'].lower(), None if len(x[2])==0 else x[2][0]['value'].lower()])
    
rdd2.cache()

Out[10]: PythonRDD[10] at RDD at PythonRDD.scala:58

In [0]:
#rdd2.count(), rdd2.take(3)

In [0]:
#getting a list of different types of keywords
#locations list
rdd_l = rdd2.filter(lambda x: x[6] == 'glocations').map(lambda x: (x[7], 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 200).map(lambda x : [x[0], x[1]]).sortBy(lambda x: x[1], ascending = False)
loc_list = rdd_l.keys().filter(lambda x: x !='new york city').collect()

#subjects list
rdd_s = rdd2.filter(lambda x: x[6] == 'subject').map(lambda x: (x[7], 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 100).map(lambda x : [x[0], x[1]]).sortBy(lambda x: x[1], ascending = False)
sub_list = rdd_s.keys().filter(lambda x: x!='company earnings' and x!='company reports' and x!='no index terms').collect()

#organizations list
rdd_o = rdd2.filter(lambda x: x[6] == 'organizations').map(lambda x: (x[7], 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 100).map(lambda x : [x[0], x[1]]).sortBy(lambda x: x[1], ascending = False)
org_list = rdd_o.keys().collect()

In [0]:
#type of keywords in news articles
rdd_t = rdd2.map(lambda x: (x[6], 1)).reduceByKey(lambda x, y: x+y)
rdd_t.collect()

Out[14]: [(None, 503064),
 ('glocations', 589551),
 ('creative_works', 4393),
 ('organizations', 427074),
 ('subject', 864790),
 ('persons', 505621)]

## Creating Plotly Graphs

### Display Yearly Trend (Deepak Singh)

- Create a stacked bar chart showing yearly counts of news published per weekday

In [0]:
#Weekly Trends
rdd6 = rdd2.map(lambda x: ((x[1], x[4]), 1)).reduceByKey(lambda x, y: x+y)\
    .map(lambda x : [x[0][0], x[0][1], x[1]]).filter(lambda x: x[0] >1990)
rdd6.count(), rdd6.take(5)

Out[19]: (217,
 [[2003, 'Sunday', 29264],
  [1995, 'Tuesday', 10221],
  [1993, 'Thursday', 10505],
  [2013, 'Tuesday', 13259],
  [2015, 'Monday', 11688]])

In [0]:
# Weekly News Publications on all Topics
org_count_df = pd.DataFrame(rdd6.collect(), columns=['year', 'weekday', 'count'])
fig = px.bar(org_count_df, 
                 x="year", 
                 y="count",
                 color="weekday",
                 height=800,
                 width = 1200,
                 title = 'Yearly Trend of News Articles per weekday'
                )
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
<h3> The above stacked bar chart indicates changing patterns in published news articles </h3>
- Before 2006, most of the news was published on Sundays <br>
- After 2006, there has been a declining trend in overall number of articles published <br>
- Barring Saturday, Monday and Sunday, all other days are seeing an almost equal number of news articles being published <br>
- Years 2006 and 2009 saw a sudden spike in number of news articles <br>

### Top Newsworthy Locations (Deepak Singh)

- Create a scatter plot showing yearly counts of news published per location

In [0]:
# locations
rdd3 = rdd2.filter(lambda x: x[6] == 'glocations').map(lambda x: ((x[7], x[0].year), 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 200).map(lambda x : [x[0][0], x[0][1], x[1]])
rdd3.count(), rdd3.take(5)

Out[15]: (409,
 [['new orleans (la)', 2005, 441],
  ['israel', 2002, 1037],
  ['great britain', 2001, 551],
  ['europe', 2001, 648],
  ['israel', 2003, 744]])

In [0]:
# Top Newsworthy Locations
loc_count_df = pd.DataFrame(rdd3.collect(), columns=['locations', 'year', 'count'])
fig = px.scatter(loc_count_df[loc_count_df['locations'].isin(loc_list)], 
                 x="year", 
                 y="count",
                 color="locations",
                 height=600, 
                 width = 1200,
                 title = 'Yearly Trend of News Articles from different Locations'
                )
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
<h3> The above scatter plot highlights the following key points </h3>
- Prior to 2006, news articles were being published on a diverse range of locations across the world <br>
- This has reduced drastically post 2006, and only a few locations see coverage of more than 200 news articles per year <br>
- The news locations of most interest in the past few years have been: China, California, Iran, Ukraine, Russia <br>

### Yearly Trend of News Articles Topics (Tianrui)

- Create a scatter plot showing yearly counts of news published per topic of interest

In [0]:
#Topics or Subjects
rdd4 = rdd2.filter(lambda x: x[6] == 'subject').map(lambda x: ((x[7], x[0].year), 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 300).map(lambda x : [x[0][0], x[0][1], x[1]])
rdd4.count(), rdd4.take(5)

Out[23]: (421,
 [['mergers, acquisitions and divestitures', 2009, 415],
  ['united states economy', 2009, 786],
  ['travel and vacations', 2008, 1156],
  ['mergers, acquisitions and divestitures', 2008, 524],
  ['housing', 2008, 429]])

In [0]:
# Top Newsworthy Topics
sub_count_df = pd.DataFrame(rdd4.collect(), columns=['topics', 'year', 'count'])
fig = px.scatter(sub_count_df[sub_count_df['topics'].isin(sub_list)], 
                 x="year", 
                 y="count",
                 color="topics",
                 height=500,
                 width = 1200,
                 title = 'Yearly Trend of News Articles focusing on Various Topics'
                )
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
    <h3> The above scatter plot highlights the following key points </h3>
- Prior to 2006, there were only a few topics being published digitally (it could be due to internet/digital propensity) <br>
- Coronavirus and Presidential elections of 2020 are the clear highlights of the past couple of years <br>

### Trend of News Sections (Vichitra)

In [0]:
rdd_section = rdd.map(lambda x: (x[-4], 1)).reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[1], ascending=False).collect()

In [0]:
section_count_df = pd.DataFrame(rdd_section[:20], columns=['section', 'count'])
fig = px.bar(section_count_df, 
                 x="section", 
                 y="count",
                 color="section",
                 height=800,
                 width = 1200,
                 title = 'Most Popular News Sections in NYT'
                )
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
<h3> The above bar plot indicates the most popular new sections of NYT </h3>
- Based on Business Day articles, NYT seems to be the best publisher for business enthusiasts <br>
- Technology, Science and health gets quite less coverage from NYT <br>
- Books getting more coverage than technology, science and health and movie aligns with NYT Best Sellers list  <br>

###Trend for Diseases Publication (Vichitra)

In [0]:
def term_counts(rdd, term_list, section='Health'):
    final_df = pd.DataFrame()
    
    for term in term_list:
        temp_rdd = rdd.filter(lambda x: x[0] is not None and x[-4] == section)\
                .filter(lambda x: term in x[0].lower())\
                .map(lambda x: (x[6][:4], 1)).reduceByKey(lambda x, y: x+y)\
                .sortBy(lambda x: x[0], ascending=True)\
                .map(lambda x: (x[0], x[1], term)).collect()
        
        temp_df = pd.DataFrame(temp_rdd, columns = ['year', 'count', 'term'])
        temp_rdd = None
        final_df = pd.concat([final_df, temp_df])
    return final_df

In [0]:
dff = term_counts(rdd, ['cancer', 'coronavirus', 'vaccine', 'depression', 'heart', 'flu'], section='Health')

In [0]:
fig = px.line(dff, x="year", y='count', color = 'term', title='Trend for diseases publication')
#fig.update_traces(textposition="bottom right")
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
 - From the above plot, We can see how other diseases were underreported once COVID hit in 2020 <br>

### Yearly Trend for Average Word Count (Vichitra)

In [0]:
rdd_section_avg = rdd.map(lambda x: (x[6], x[8], x[-1]))\
                     .filter(lambda x: (x[2]>0  and x[1] is not None and x[1] != 'None'))\
                     .map(lambda x: ((int(x[0][:4]), x[1]), x[2])).groupByKey().mapValues(lambda x: sum(x) / len(x))

In [0]:
desk_words_avg = rdd_section_avg.collect()
ls = [(item[0][0], item[0][1], item[1]) for item in desk_words_avg]
df = pd.DataFrame(ls, columns= ['year', 'desk', 'avg word count'])
df = df[df['desk'].isin(['Classified', 'Business/Financial Desk', 
                         'Metropolitan Desk', 'Editorial Desk', 'Sports Desk',
                          'Foreign Desk', 'National Desk'])]
df = df.sort_values('year')

In [0]:
fig = px.line(df, x="year", y='avg word count', color = 'desk', title='Trend for avg word from different news desk over the years')
#fig.update_traces(textposition="bottom right")
fig.show()

### Yearly Trend of News Articles Organizations(Aydin)

- Create a scatter plot showing organization getting most news coverage in past 30 years

In [0]:
#Organizations
rdd5 = rdd2.filter(lambda x: x[6] == 'organizations').map(lambda x: ((x[7], x[0].year), 1)).reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1] >= 80).map(lambda x : [x[0][0], x[0][1], x[1]])
rdd5.count(), rdd5.take(5)

Out[25]: (461,
 [['new york jets', 2002, 198],
  ['supreme court', 2004, 140],
  ['united nations', 2001, 103],
  ['food and drug administration', 2003, 125],
  ['new york times neediest cases fund', 2009, 98]])

In [0]:
# Top Newsworthy Organizations
org_count_df = pd.DataFrame(rdd5.collect(), columns=['organization', 'year', 'count'])
fig = px.scatter(org_count_df[org_count_df['organization'].isin(org_list)], 
                 x="year", 
                 y="count",
                 color="organization",
                 height=500,
                 width = 1200,
                 title = 'Yearly Trend of News Articles focusing on Various Organizations'
                )
fig.show()

<div style="border-radius:10px; border : #682F2F solid; background-color:#e1f6e1; font-size:100%; padding-left:10px; text-align:left">
    <h3> The above scatter plot indicates the following key points </h3>
- New York Yankees is the organization about which maximum number of articles have been published <br>
- There has been a decline in the total number of articles per organization per year after 2006 <br>
- Very few organization in the recent years have had news articles count of more than 100 <br>

In [0]:
rdd2.unpersist()

Out[34]: PythonRDD[10] at RDD at PythonRDD.scala:58