# Theory

## Earlier implementation:
an RDD has a compute function that produces an Iterator[T] for the
data that will be stored in the RDD.

The compute function (or computation) is opaque to Spark. That is, Spark does
not know what you are doing in the compute function. Whether you are performing
a join, filter, select, or aggregation, Spark only sees it as a lambda expression. Another
problem is that the Iterator[T] data type is also opaque for Python RDDs; Spark
only knows that it’s a generic object in Python.
Furthermore, because it’s unable to inspect the computation or expression in the
function, Spark has no way to optimize the expression


## New implementation
Spark 2.x introduced a few key schemes for structuring Spark. 
One is to express computations
by using common patterns found in data analysis. These patterns are
expressed as high-level operations such as filtering, selecting, counting, aggregating,
averaging, and grouping. This provides added clarity and simplicity.

This specificity is further narrowed through the use of a set of common operators in a
DSL. Through a set of operations in DSL, available as APIs in Spark’s supported languages
(Java, Python, Spark, R, and SQL), these operators let you tell Spark what you
wish to compute with your data, and as a result, it can construct an efficient query
plan for execution.

And the final scheme of order and structure is to allow you to arrange your data in a
tabular format, like a SQL table or spreadsheet, with supported structured data types

# Getting to work
we want to aggregate all the ages for each name, group by
name, and then average the ages

## low level

In [33]:
from pyspark import SparkContext

In [34]:
# Create an RDD of tuples (name, age)
sc = SparkContext("local", "SparkOldImplementation")
data_rdd = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)])

ages_rdd = data_rdd.map(lambda x: (x[0], (x[1],1))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).map(lambda x: (x[0], x[1][0]/x[1][1]))
for age_info in ages_rdd.collect():
    print(age_info)
sc.stop()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Example-3_6, master=local[*]) created by getOrCreate at <ipython-input-29-201b247bcfdb>:22 

## using structured apis

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

In [None]:
spark = SparkSession.builder.appName("SparkStructuredApis").getOrCreate()

data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)], ["name", "age"])
ages_df = data_df.groupBy("name").agg(avg("age"))
ages_df.show()

In [None]:
spark.stop()

# Dataframes

## schemas and creating dataframes
A schema in Spark defines the column names and associated data types for a Data‐
Frame.

Always better to define the schema upfront rather than leaving this work of figuring out schema to spark which may be a bit expensive when dealing with large file sizes. for 3 reasons:
• You relieve Spark from the onus of inferring data types.
• You prevent Spark from creating a separate job just to read a large portion of
your file to ascertain the schema, which for a large data file can be expensive and
time-consuming.
• You can detect errors early if data doesn’t match the schema.

### Two ways to define a schema:
1. define it programmatically,
2. Or, employ a Data Definition Language (DDL) string, which is much simpler and easier to read.

In [None]:
# programetic definition example:
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False)])

In [None]:
# definition vi ddl:
schema = "author STRING, title STRING, pages INT"

In [None]:
# full example for schema definition and creation
from pyspark.sql import SparkSession

schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
[2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
"LinkedIn"]],
[3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
"twitter", "FB", "LinkedIn"]],
[4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
["twitter", "FB"]],
[5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
"twitter", "FB", "LinkedIn"]],
[6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
["twitter", "LinkedIn"]]
]

spark = (SparkSession
.builder
.appName("Example-3_6")
.getOrCreate())
# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above
blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

In [None]:
blogs_df._schema

In [None]:
spark.stop()