In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
mydf = spark.read.parquet("s3://zandras3/502final/df.parquet/*")

In [5]:
mydf.printSchema()

root
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: string (nullable = true)
 |-- MonthYear: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- FractionDate: string (nullable = true)
 |-- Actor1Code: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1CountryCode: string (nullable = true)
 |-- Actor1KnownGroupCode: string (nullable = true)
 |-- Actor1EthnicCode: string (nullable = true)
 |-- Actor1Religion1Code: string (nullable = true)
 |-- Actor1Religion2Code: string (nullable = true)
 |-- Actor1Type1Code: string (nullable = true)
 |-- Actor1Type2Code: string (nullable = true)
 |-- Actor1Type3Code: string (nullable = true)
 |-- Actor2Code: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- Actor2KnownGroupCode: string (nullable = true)
 |-- Actor2EthnicCode: string (nullable = true)
 |-- Actor2Religion1Code: string (nullable = true)
 |-- Actor2Religion2Code: stri

In [6]:
mydf.show(5)

+-------------+--------+---------+----+------------+----------+----------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+----------+------------+-----------------+--------------------+----------------+-------------------+-------------------+---------------+---------------+---------------+-----------+---------+-------------+-------------+---------+--------------+-----------+----------+-----------+-----------------+--------------+------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+--------------+--------------------+---------------------+------------------+-------------+--------------+-------------------+---------+--------------------+
|GLOBALEVENTID| SQLDATE|MonthYear|Year|FractionDate|Actor1Code|Actor1Name|Acto

In [7]:
df_feat = mydf.select(mydf.SQLDATE, mydf.MonthYear, mydf.Year, 
                      mydf.Actor2Code, mydf.Actor2Name, mydf.Actor2CountryCode, 
                      mydf.QuadClass, mydf.GoldsteinScale, mydf.NumSources, 
                      mydf.AvgTone, mydf.Actor1Geo_Type, mydf.Actor1Geo_CountryCode,
                     mydf.Actor1Geo_Lat, mydf.Actor1Geo_Long, mydf.Actor2Geo_Type, 
                      mydf.Actor2Geo_CountryCode, mydf.Actor2Geo_Lat, mydf.Actor2Geo_Long,
                     mydf.ActionGeo_Type, mydf.ActionGeo_CountryCode,
                     mydf.ActionGeo_Lat, mydf.ActionGeo_Long)

In [8]:
df_feat.printSchema()

root
 |-- SQLDATE: string (nullable = true)
 |-- MonthYear: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Actor2Code: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2CountryCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: string (nullable = true)
 |-- NumSources: string (nullable = true)
 |-- AvgTone: string (nullable = true)
 |-- Actor1Geo_Type: string (nullable = true)
 |-- Actor1Geo_CountryCode: string (nullable = true)
 |-- Actor1Geo_Lat: string (nullable = true)
 |-- Actor1Geo_Long: string (nullable = true)
 |-- Actor2Geo_Type: string (nullable = true)
 |-- Actor2Geo_CountryCode: string (nullable = true)
 |-- Actor2Geo_Lat: string (nullable = true)
 |-- Actor2Geo_Long: string (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- ActionGeo_Lat: string (nullable = true)
 |-- ActionGeo_Long: string (nullable = true)



In [9]:
df_feat.show(15)

+--------+---------+----+----------+------------+-----------------+---------+--------------+----------+-----------------+--------------+---------------------+-------------+--------------+--------------+---------------------+-------------+--------------+--------------+---------------------+-------------+--------------+
| SQLDATE|MonthYear|Year|Actor2Code|  Actor2Name|Actor2CountryCode|QuadClass|GoldsteinScale|NumSources|          AvgTone|Actor1Geo_Type|Actor1Geo_CountryCode|Actor1Geo_Lat|Actor1Geo_Long|Actor2Geo_Type|Actor2Geo_CountryCode|Actor2Geo_Lat|Actor2Geo_Long|ActionGeo_Type|ActionGeo_CountryCode|ActionGeo_Lat|ActionGeo_Long|
+--------+---------+----+----------+------------+-----------------+---------+--------------+----------+-----------------+--------------+---------------------+-------------+--------------+--------------+---------------------+-------------+--------------+--------------+---------------------+-------------+--------------+
|20151224|   201512|2015|       BUS|    

In [10]:
from pyspark.sql.functions import avg

In [13]:
tone = df_feat.select("Year","AvgTone").groupBy("Year").agg(avg("AvgTone"))

In [14]:
tone.head(10)

[Row(Year='2016', avg(AvgTone)=-2.092142628981268),
 Row(Year='2006', avg(AvgTone)=-1.5654240261469206),
 Row(Year='2007', avg(AvgTone)=-1.4407238930749484),
 Row(Year='2015', avg(AvgTone)=-2.121795877376201)]

In [23]:
tones_pd = tone.toPandas()

In [24]:
tones_pd

Unnamed: 0,Year,avg(AvgTone)
0,2016,-2.092143
1,2006,-1.565424
2,2007,-1.440724
3,2015,-2.121796


In [16]:
# df_feat = df_feat.withColumn("AvgTone", df_feat["AvgTone"].cast("double"))

In [18]:
import dask
import dask.dataframe as dd

In [22]:
mydf.write.save("s3://zandras3/502final/news.csv", format='csv', header=True)

In [35]:
dask_df = dd.read_csv('s3://zandras3/502final/news.csv/*.csv')

In [36]:
dask_df

Unnamed: 0_level_0,GLOBALEVENTID,SQLDATE,MonthYear,Year,FractionDate,Actor1Code,Actor1Name,Actor1CountryCode,Actor1KnownGroupCode,Actor1EthnicCode,Actor1Religion1Code,Actor1Religion2Code,Actor1Type1Code,Actor1Type2Code,Actor1Type3Code,Actor2Code,Actor2Name,Actor2CountryCode,Actor2KnownGroupCode,Actor2EthnicCode,Actor2Religion1Code,Actor2Religion2Code,Actor2Type1Code,Actor2Type2Code,Actor2Type3Code,IsRootEvent,EventCode,EventBaseCode,EventRootCode,QuadClass,GoldsteinScale,NumMentions,NumSources,NumArticles,AvgTone,Actor1Geo_Type,Actor1Geo_FullName,Actor1Geo_CountryCode,Actor1Geo_ADM1Code,Actor1Geo_Lat,Actor1Geo_Long,Actor1Geo_FeatureID,Actor2Geo_Type,Actor2Geo_FullName,Actor2Geo_CountryCode,Actor2Geo_ADM1Code,Actor2Geo_Lat,Actor2Geo_Long,Actor2Geo_FeatureID,ActionGeo_Type,ActionGeo_FullName,ActionGeo_CountryCode,ActionGeo_ADM1Code,ActionGeo_Lat,ActionGeo_Long,ActionGeo_FeatureID,DATEADDED,SOURCEURL
npartitions=520,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1
,int64,int64,int64,int64,float64,object,object,object,object,float64,object,object,object,object,float64,object,object,object,object,float64,object,float64,object,object,float64,int64,int64,int64,int64,int64,float64,int64,int64,int64,float64,int64,object,object,object,float64,float64,object,int64,object,object,object,float64,float64,object,int64,object,object,object,float64,float64,object,int64,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [31]:
from pandas.io.common import EmptyDataError

file = 's3://zandras3/502final/news.csv'

def read_data(file):
    try:
        df = pd.read_csv(file, delim_whitespace=True)
        print("yes")
    except EmptyDataError:
        df = pd.DataFrame()
        print("no")

#     return df