<a href="https://colab.research.google.com/github/OliveraNovovic/pyspark_test/blob/master/PySpark_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from google.colab import files
files.upload()

In [0]:
!ls


In [0]:
import pandas as pd
test_pd_df = pd.read_excel("all_merged.xlsx")
test_df = spark.createDataFrame(test_pd_df)
test_df.printSchema()
#test_df.show(5)
#num_lines = test_df.count()
#print(num_lines)


In [0]:
import datetime
from pyspark.sql.functions import year, month, dayofmonth, col
'''
test_df.select(
    year("date").alias('year'), 
    month("date").alias('month'), 
    dayofmonth("date").alias('day')
).show()
'''

In [0]:
#test_df.where((year("date").alias('year') == '2018') & (col('location') == 'NoviSad')).show()
#test_df.filter(year('date') == '2013').show()
test_df.printSchema()

row_num = test_df.count() 
col_num = len(test_df.columns)
print("Number of rows is ", row_num, ", Number of columns is ", col_num)

test_df.describe('prec').show() #General statistics over column
test_df.select(col('location'), col('prec'), col('date')).where(col('prec') > 70).show()
test_df.select(col('location'), col('prec'), col('date')).where((col('prec') > 0) & (col('prec') < 2)).show()

#extract dataframe subset based on year or location
location = 'Valjevo'
df_location = test_df.filter(col('location') == location)
print("DataFrame subset for location ", location)
df_location.show()
target_year = '2014'
df_year = test_df.filter(year('date') == target_year)
print("DataFrame subset for year ", target_year)
df_year.show()

In [0]:
#udf function example
from pyspark.sql.types import FloatType, StringType
from pyspark.sql.functions import udf
def square_float(x):
    return float(x**2)

square_udf_float2 = udf(lambda z: square_float(z), FloatType())

test_df.select('prec', square_udf_float2('prec').alias('prec_squared')).show()


In [0]:
#control flow in udf example
def ctrl_flow(x):
  if x<1:
    return "low prec"
  elif x<10:
    return "medium prec"
  else:
    return "high prec"

prec_measure_flag = udf(ctrl_flow, StringType())
new_df = test_df.withColumn("prec_flag", prec_measure_flag(test_df.prec))   
new_df.select(col('date'), col('location'), col('prec'), col('prec_flag')).show() 

#number of days with low precipitation
num = new_df.filter(col('prec_flag') == 'low prec').count()
print("Overall number of days with low precipitation ", num)

In [0]:
#precipitation sum over single year for specific location
target_year = 2011
df_subset_year = test_df.filter(year('date') == target_year)
location = 'Kopaonik'
df_subs_2011_loc = df_subset_2011.filter(col('location') == location)

prec_sum = df_subs_2011_loc.agg({"prec":"sum"}).collect()[0]

result = prec_sum["sum(prec)"]
print("Precipitation sum over year ", target_year, " for location ", location, " is ", result)


In [0]:
#control flow with for loop example
locations = ['Beograd', 'Kikinda', 'Kopaonik', 'Valjevo']
for loc in locations:
    target_year = 2011
    df_subset_year = test_df.filter(year('date') == target_year)
    location = loc
    df_subs_2011_loc = df_subset_2011.filter(col('location') == location)

    prec_sum = df_subs_2011_loc.agg({"prec":"sum"}).collect()[0]

    result = prec_sum["sum(prec)"]
    print("Precipitation sum over year ", target_year, " for location ", location, " is ", result)

In [0]:
# kMeans clustering
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import year, month, dayofmonth, col
import pandas as pd
import matplotlib.pyplot as plt


df_subs_year = test_df.filter(year('date') == 2014)
vecAssembler = VectorAssembler(inputCols=['rad', 'tmax', 'tmin', 't_mean', 'td2m', 'evap', 'sml1', 'prec', 'vpd'], outputCol="features")
new_df = vecAssembler.transform(df_subs_year)
#new_df.show()
kmeans = KMeans(k=5, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))
transformed = model.transform(new_df)

#transformed dataframe with clustering result save in panda
panda_kmeans_res = transformed.select(col('date'), col('location'), col('prediction')).toPandas()
stations = pd.read_excel('Serbia_stations3.xlsx')
#stations.head(5)
merge = pd.merge(panda_kmeans_res, stations, on='location')
#merge.head(5)





In [0]:
import seaborn as sns
sns.set()
#plot data with seaborn
facet = sns.lmplot(data=merge, x='lon', y='lat', hue='prediction', 
                   fit_reg=False, legend=True, legend_out=True)


In [0]:
#time-series plot, minimum daily temperatures for specific location
from pyspark.sql.functions import year, month, dayofmonth, col
import pandas as pd
import matplotlib.pyplot as plt


df_subs = test_df.select(col('date'), col('tmin'), col('location'))
loc = 'Valjevo'
df_subs_loc = df_subs.filter(col('location') == loc).toPandas()
#df_subs_loc.dtypes
df_subs_loc.set_index('date').plot()



In [0]:
#data aggregation using PySpark
import pyspark.sql.functions as fn

test_df.cache()\
.filter(fn.col('location') == 'Valjevo')\
.filter(fn.year('date') == '2014')\
.groupby(fn.month('date'))\
.agg(fn.mean('prec').alias('avg_prec'), fn.mean('tmin').alias('avg_tmin'))\
.orderBy(fn.month('date'))\
.show()


In [0]:
#ranking
from pyspark.sql import Window
import pyspark.sql.functions as fn

test_df.select('date', 'location', 'tmax', fn.rank().over(Window.orderBy(fn.desc('tmax'))).alias('tmaxrank')).show()
