# PySpark (on GCP)

<b>Create a cluster:</b>

<i>gcloud dataproc clusters create CLUSTERNAME --region=REGION</i>


<b>To delete:</b>

<i>gcloud dataproc clusters delete CLUSTERNAME --region=REGION</i>


<b>To list created clusters:</b>

<i>gcloud dataproc clusters list --region=REGION</i>

And you will be able to see them in "Compute Engine" -> "VM instances"


<b>To connect to cluster, where MASTERNODE is the name of cluster with added "-m", and ZONE is a zone where your cluster is in with "-X", where X is a specific letter (check "VM instances"):</b>

<i>gcloud compute ssh MASTERNODE --project=PROJECT --zone=ZONE</i>


<b>Start Pyspark session by running:</b>

<i>pyspark</i>


<b>Submit the job (CLUSTER without "-m")</b>

<i>gcloud dataproc jobs submit pyspark --cluster=CLUSTER --region=REG ps_script.py -- SCRIPT-ARGS</i>

## PySpark code:

<b>Read data:</b>

<i>data = sc.textFile('')</i>



<b>Show the data:</b>

<i>data.collect()</i>


<b>Show file info:</b>

<i>gsutil cat gs://bucket_name/numbers.txt</i>


<b>Apply some function:</b>
1. Map

   ><i>stripped = data.map(lambda line: line.strip())</i>
2. Reduce

   ><i>intdata.reduce(lambda x,y: x+y)</i>
3. Sample(withReplacement, fraction, [seed])

   ><i>samp = data.sample(True, 0.5)</i>
4. Use predefined function

   ><i>from prime import is_prime

   >primes = data.filter(is_prime)</i>
    
5. Show distinct elements

   ><i>fields_distinct = fields.distinct()</i>
6. Producing a list for each element in the RDD, and then concatenating those lists

   ><i>flattened = data.flatMap(lambda line: [x for x in line.split()])</i>
7. Counts elements

   ><i>uniqwords.count()</i>
8. Counts how many times each key appears and ignores their values

    >words = data_flat.map(lambda w: (w.lower(), 0))

    >words.countByKey()
    
9. Values for each key are aggregated using the given reduce function

    >wordcounts = wordkeys.reduceByKey(lambda x, y: x+y)


More you can find here:
    
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#actions

## Tasks

#### Given the csv file with format DATE, TMAX, TMIN find and show average of maximum and minimum temperature for a specific year

In [None]:
# Libraries
from pyspark import SparkConf, SparkContext
import sys


# Check that input and output files are specified
if len(sys.argv) != 3:
    print('Usage: ' + sys.argv[0] + ' <in> <out>')
    sys.exit(1)
inputlocation = sys.argv[1]
outputlocation = sys.argv[2]


# Configure PySpark variable
conf = SparkConf().setAppName('Temperature')
sc = SparkContext(conf=conf)


# Actual code
data = sc.textFile(inputlocation)

# split the data ([YYYY-MM-DD, TMAX, TMIN])
data_split = data.map(lambda line: line.split(','))
# get the format of [YYYY, TMAX, TMIN, 1] (1 for 1 element)
data_year = data_split.map(lambda el: [el[0][:4], [int(el[1]), int(el[2]), 1]])
# group by key to have [YYYY, (Iterable <TMAX, TMIN>)]
data_iter = data_year.reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1], x[2] + y[2]])
# reduce by key and get values
data_out = data_iter.map(lambda el: [el[0], [el[1][0]/el[1][2], el[1][1]/el[1][2]]])


# Save the output and stop PySpark
data_out.saveAsTextFile(outputlocation)
sc.stop()

#### Given the csv file with format DATE, TMAX, TMIN find and show the date (MM-DD) of a max and min temperature for each year

In [None]:
# Libraries
from pyspark import SparkConf, SparkContext
import sys


# Check that input and output files are specified
if len(sys.argv) != 3:
    print('Usage: ' + sys.argv[0] + ' <in> <out>')
    sys.exit(1)
inputlocation = sys.argv[1]
outputlocation = sys.argv[2]


# Configure PySpark variable
conf = SparkConf().setAppName('Temperature')
sc = SparkContext(conf=conf)


# Actual code
data = sc.textFile(inputlocation)

# split the data ([YYYY-MM-DD, TMAX, TMIN])
data_split = data.map(lambda line: line.split(','))

# separate TMAX and TMIN and later join
# get the format of [YYYY, TMAX, day for TMAX] 
data_max = data_split.map(lambda el: [el[0][:4], [int(el[1]), el[0][5:]]])
# get the format of [YYYY, TMAX, day for TMAX] 
data_min = data_split.map(lambda el: [el[0][:4], [int(el[2]), el[0][5:]]])

def larger_element(x, y):
    if x[0] > y[0]:
        return x
    return y

def smaller_element(x, y):
    if x[0] < y[0]:
        return x
    return y

data_max_reduced = data_max.reduceByKey(larger_element)
data_min_reduced = data_min.reduceByKey(smaller_element)

# join 2 RDDs
data_out = data_max_reduced.join(data_min_reduced)

# Save the output and stop PySpark
data_out.saveAsTextFile(outputlocation)
sc.stop()