In [1]:
display(dbutils.fs.ls("/FileStore/tables"))

In [2]:
weather_data = sc.textFile("/FileStore/tables/pm2_5Taiwan-3b69f.csv")

In [3]:
weather_data_rdd = weather_data.map(lambda line : line.split(","))
for x in weather_data_rdd.take(30):
    for i in range(len(x)):
        print x[i],
    print ""

In [4]:
pm25schema = weather_data_rdd.first()
for i in pm25schema:
  print i,

In [5]:
import math
def remove_row_with_noise (x):
    for i in range(3, len(x)):
        if not x[i].isdecimal():
            return False
    return True 

In [6]:
clean_weather_data = weather_data_rdd\
                    .filter(lambda x: x!=pm25schema)\
                    .filter(remove_row_with_noise)

# 練習1: 讓我們求取2015年，大里每小時的平均pm25數值。

### 把大里站之PM25資料撈出來

In [9]:
dalipm25 = clean_weather_data.filter(lambda x: x[1] == u'大里' and x[2]== "PM2.5")
lis = dalipm25.take(10)
for x in lis:
    for i in range(len(x)):
        print x[i],
    print 

# 作法：將每小時資料轉成(小時,pm數值)，以求取每小時的平均值。

例如：
    2015/01/01 大里 PM2.5 53 55 58 53 43 36 35 42 55 64 65 59 52 44 47 41 43 40 42 35 28 20 18 16
    --> [('hr_0', 53.0) ('hr_1', 55.0) ('hr_2', 58.0) ('hr_3', 53.0) ('hr_4', 43.0) ('hr_5', 36.0) ('hr_6', 35.0) ('hr_7', 42.0) ('hr_8', 55.0) ('hr_9', 64.0) ('hr_10', 65.0) ('hr_11', 59.0) ('hr_12', 52.0) ('hr_13', 44.0) ('hr_14', 47.0) ('hr_15', 41.0) ('hr_16', 43.0) ('hr_17', 40.0) ('hr_18', 42.0) ('hr_19', 35.0) ('hr_20', 28.0) ('hr_21', 20.0) ('hr_22', 18.0) ('hr_23', 16.0)]

In [11]:
def hourKeyGen(x):
    hourkeypair = []
    for i in range(3,27):
        hourkeypair.append(("hr_"+str(i-3),float(x[i])))
    return hourkeypair

count = dalipm25.count()
HourSum = dalipm25\
            .flatMap(hourKeyGen)\
            .reduceByKey(lambda x,y: x+y)\
            .mapValues(lambda x: x/count)\
            .map(lambda x: (x[1],x[0])).top(24)

print HourSum

# 使用DataFrame 來計算每小時平均值

In [13]:
from pyspark.sql import SQLContext
from pyspark.sql import Row
dalipm25row = dalipm25.map(lambda p:
        Row(
        date = p[0],
        location = p[1],
        measure = p[2],
        hr_01 = float(p[3]), hr_02 = float(p[4]),hr_03 = float(p[5]),hr_04 = float(p[6]),hr_05 = float(p[7]),
        hr_06 = float(p[8]),hr_07 = float(p[9]),hr_08 = float(p[10]),hr_09 = float(p[11]),hr_10 = float(p[12]),
        hr_11 = float(p[13]),hr_12 = float(p[14]),hr_13 = float(p[15]),hr_14 = float(p[16]),hr_15 = float(p[17]),
        hr_16 = float(p[18]),hr_17 = float(p[19]),hr_18 = float(p[20]),hr_19 = float(p[21]),hr_20 = float(p[22]),
        hr_21 = float(p[23]),hr_22 = float(p[24]),hr_23 = float(p[25]),hr_24 = float(p[26]),
    )
)




In [14]:
df = sqlContext.createDataFrame(dalipm25row)
df.show()

In [15]:
df.count()

In [16]:
df.printSchema()

## 使用 DataFrame.agg() 來進行column數值之統計計算 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [18]:
df.filter(df.hr_01>30).show()

## 使用 DataFrame.filter() 來進行row資料之條件計算 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [20]:
df.filter(df.hr_01>50).select("hr_01","location","measure").show()

## 使用 DataFrame.select() 來進行資料之Projection http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [22]:
from pyspark.sql import functions as F
df.agg(F.mean(df.hr_01)).show()

## 使用 DataFrame.describe() 來進行DataFrame or Column 資料之統計 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [24]:
df.describe().show()

# 使用 Spark SQL來下達SQL查詢

In [26]:
df.registerTempTable("DaliTable")

In [27]:
sqlContext.sql("""
                select * from DaliTable where date ='2015/02/07'
               """).show()

In [28]:
sqlContext.sql("""
                select count(*) count 
                from DaliTable 
                where hr_01 > 100
               """).show()

In [29]:
sqlContext.sql("""
                select AVG(hr_01) count 
                from DaliTable
               """).show()

In [30]:
sqlContext.sql("""
                select date,location, hr_01, hr_02, hr_01-hr_02 as diff 
                from DaliTable
                """).show()

In [31]:
sqlContext.sql("""
                select date,location, hr_01, hr_02, hr_01-hr_02 as diff 
                from DaliTable 
                order by diff DESC
                """).show()

In [32]:
sqlContext.sql("""
                select date,location, hr_01+hr_02+hr_03+hr_04+hr_05+hr_06+hr_07+hr_08 as sum 
                from DaliTable 
                order by sum DESC
                """).show()

In [33]:
sqlContext.sql("""
                select first(location) as location, first(date), avg(hr_01) as hr_01, avg(hr_02) as hr_02, avg(hr_03) as hr_03, avg(hr_04) as hr_04,
                    avg(hr_05) as hr_05, avg(hr_06) as hr_06, avg(hr_07) as hr_07, avg(hr_08) as hr_08
                from DaliTable
            """).show()

# Perform Pearson Correlation using DataFrame.corr( )
# 練習: 計算大里區pm10, pm2.5 間之關聯度 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
使用前面所建立的clean_weather_data rdd資料
    
    
    corr(col1, col2, method=None)
    Calculates the correlation of two columns of a DataFrame as a double value. Currently only supports the Pearson Correlation Coefficient. DataFrame.corr() and DataFrameStatFunctions.corr() are aliases of each other.

    Parameters:	
    col1 – The name of the first column
    col2 – The name of the second column
    method – The correlation method. Currently only supports “pearson”
    New in version 1.4.

In [36]:
def Generated_Measurement(x):
    date = x[0]
    location = x[1]
    measure = x[2]
    measurements_of_a_day = []
    for i in range(3, len(x)):
        measurements_of_a_day.append((date, measure, "hr"+str(i-3), x[i]))
    return measurements_of_a_day

daliData = clean_weather_data.filter(lambda x: x[1]==u"大里" and (x[2] == u"PM2.5" or x[2] == u"PM10" ))
daliData.cache()

In [37]:
from pyspark.sql import Row

daliDataRow = \
     daliData\
    .flatMap(Generated_Measurement)\
    .map(lambda x: ( (x[0], x[2]), x[1], x[3] ) )\
    .groupBy(lambda x: x[0])\
    .filter(lambda x: len(x[1])==2)\
    .mapValues(lambda x: list(x))\
    .mapValues(lambda x: [x[0][1], x[0][2], x[1][1], x[1][2]])\
    .map(lambda x:[ x[0][0], x[0][1], x[1][1], x[1][3]])\
    .map(lambda x: Row(
            date = x[0],
            time = x[1],
            pm10 = float(x[2]),
            pm25 = float(x[3])
        ))
    
df = sqlContext.createDataFrame(daliDataRow)

In [38]:
df.show(10)

In [39]:
from pyspark.sql import functions as F
df.agg(F.avg(df.pm10)).show()

In [40]:
df.corr("pm10","pm25")

In [41]:
df.describe().show()

# 作業1 (個人): 請使用SPARK SQL求取2015年，全國pm2.5最高的前十個工作站測點以及其日期。

# 作業2 (團體): 請求取2015年，全國與大里區pm2.5濃度關聯度最高的前三個工作站測點。

# 作業3(個人): 請算算看2015全國哪個測站，紫爆天數最多？