### Spark Basic Architecture and Terminology

Un applicazione Spark consiste in un **Driver Program** e di un gruppo di **Executors** sul cluster:

- Il **driver** è il processo che:

    1) esegue il main della Spark Application
    
    2) crea **SparkContext** che permette di stabilire una comunicazione con il cluster ed i "resource manager" al fine di coordinare ed eseguire i **tasks**
    
    
- Gli **executors** sono processi eseguiti sui nodi **worker** del cluster che sono responsabili di eseguire i **task** che il driver ha assegnato a loro.


- Il **cluster manager** (*Mesos* o *YARN*) è responsabile dell'allocazione delle risorse fisiche per la spark application

<img src="https://spark.apache.org/docs/latest/img/cluster-overview.png">

Ogni applicazione Spark necessita di un **entry point** che le consenta di comunicare con basi di dati ed eseguire determinate operazioni come la lettura e la scrittura di dati. Da Spark 2.x questo è entry point è la **SparkSession** che sostituisce i precedenti *SparkContext, SQLContext* e HiveContext di Spark 1.x.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Lettura dei dati

In [None]:
# method 1 for reading a CSV file
df = spark.read.csv(file_path, header=True)

# method 2 for reading a CSV file
df = spark.\ read \
            .format(csv_plugin) \
            .options(header='true', inferSchema='true') \
            .load(file_path)

# Reading a json file
df = spark.read.json(json_file_path)

# Reading a text file
df = spark.read.text(text_file_path)

### Creazione dei DataFrame

##### 1 metodo

In [None]:
from pyspark.sql import Row

# populate two rows with random values
f1 = Row(original_title='Eroica', budget='13393950', year=1992)
f2 = Row(original_title='Night World', budget='1255930', year=1998)

# store the two rows in an array and pass it to Spark
films = [f1, f2]
df = spark.createDataFrame(films)

df.show()

##### 2 metodo: creazione a partire da RDD

In [None]:
from pyspark.sql.types import StringType, StructField, StructType, IntegerType

rdd = spark.textFile(csv_file_path)
schema = StructType([
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("age", IntegerType(), True)
    ])
   
df = spark.createDataFrame(rdd, schema)

##### 3 metodo: creazione a partire da query SQL

In [None]:
#eseguire una query sql direttamente su un file parquet
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

## Lettura DataFrame

In [None]:
df.head(5)
data.select("text","date").show()
data.printSchema()
data.describe().show()

## Modifiche DataFrame

##### 1) Creare una colonna

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Create a column with the default value = 'xyz'
df = df.withColumn('new_column', F.lit('xyz'))

# Create a column with default value as null
df = df.withColumn('new_column', F.lit(None).cast(StringType()))

# Create a column using an existing column
df = df.withColumn('new_column', 1.4 * F.col('existing_column'))

# Another example using the MovieLens database
df = df.withColumn('test_col3', F.when(F.col('avg_ratings') < 7, 'OK')\
                                 .when(F.col('avg_ratings') < 8, 'Good')\
                                 .otherwise('Great')).show()

# Create a column using a UDF
def categorize(val):
    if val < 150: 
        return 'bucket_1'
    else:
        return 'bucket_2'
    
my_udf = F.udf(categorize, StringType())

df = df.withColumn('new_column', categorize('existing_column'))

##### 2) Eliminare una colonna

In [None]:
# Remove single column
df.drop('this_column')

# Remove multiple columns in a go
drop_columns = ['this_column', 'that_column']
df.select([col for col in df.columns if column not in drop_columns])

##### 3) Rinominare una colonna

In [None]:
# Changing column name with withColumnRenamed feature
df = df.withColumnRenamed('existing_column_name', 'new_column_name')

# Changing column with selectExpr (you'll have to select all the columns here)
df = df.selectExpr("existing_column_name AS existing_1", "new_column_name AS new_1")

df = df.select(col("existing_column_name").alias("existing_1"), col("new_column_name").alias("new_1"))

##### 4) Sintassi SQL in Spark

In [1]:
# Creates a temporary view using the DataFrame
df.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
query = "SELECT name FROM people WHERE age BETWEEN 13 AND 19"
teenagerNamesDF = spark.sql(query)
teenagerNamesDF.show()

NameError: name 'spark' is not defined