# Spark

This page considers the python SDK for Spark.

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark_session = SparkSession.builder.appName('Temp').getOrCreate()

## Dataframe

Spark SQL contains a DataFrame objects that provide a way to interact with tabular data.

You can define a data frame: 

- Directly from your code using the `createDataFrame` method of the session object.
- Using some special methods to read from external sources stored in the `read` attribute of the session.

---

The following cell defines the Spark dataset, which is formatted so that each row is a tuple whose values correspond to each column. And shows it.

In [14]:
df = spark_session.createDataFrame(
    data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)]
)
df.show()

                                                                                

+-----+---+
|   _1| _2|
+-----+---+
|Alice| 25|
|  Bob| 30|
|Cathy| 35|
+-----+---+



The following cell shows an alternative way to define the same data frame. Each row here is represented as a dictionary, and the values are specified under the keys, which correesponds to the column names.

In [16]:
df = spark_session.createDataFrame(
    data=[
        {"name": "Alice", "age": 25},
        {"name": "Bob", "age": 30},
        {"name": "Cathy", "age": 35}
    ]
)
df.show()

+---+-----+
|age| name|
+---+-----+
| 25|Alice|
| 30|  Bob|
| 35|Cathy|
+---+-----+



## Read csv

Use the `read.csv` method of the spark session to read a CSV file.

---

The following cell reads the `spark.csv` file that I prepared earlier.

In [4]:
spark = SparkSession.builder.appName("Temp").getOrCreate()
df = spark.read.csv(
    "spark_files/scv_example.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape=','
)
display(df)

DataFrame[Name: string,  Age: double,  Salary: double]

### Shcema

Use the `schema` argument to define the schema. The schema can be specified as a simple string that matches column names with their expected data types.

---

The following cell shows the matching of the `int` data type to the `Age` column instead of the default `double` data type.

In [10]:
schema = """
Name string,
Age int,
Salary double
"""

spark.read.csv(
    "spark_files/scv_example.csv",
    schema=schema
)

DataFrame[Name: string, Age: int, Salary: double]

## Columns

Data frame consists of a set of columns. There are two concepts important to know for refering the columns:

- There are corresponding attibute of the data frame.
- The `pyspark.sql.functions.col` allows you to define a reference to a column, when applied to a particular dataset, will be interpreted as a specific column in that dataset.

Spark uses these references to the columns when performing operations like: `withColumn` and `filter`.

---

The following cell defines the data frame that will be used as an example.

In [45]:
test_df = spark_session.createDataFrame(
    data=[
        (8, 20),
        (9, 43),
        (15, 88)
    ],
    schema=["column1", "column2"]
)
test_df.show()

+-------+-------+
|column1|column2|
+-------+-------+
|      8|     20|
|      9|     43|
|     15|     88|
+-------+-------+



The following cell apply the `filter` with the condition specified using a direct reference to the `test_df.column1`.

In [49]:
condition = (test_df.column1 > 10)
print(type(condition))
test_df.filter(condition=condition).show()

<class 'pyspark.sql.classic.column.Column'>
+-------+-------+
|column1|column2|
+-------+-------+
|     15|     88|
+-------+-------+



Alternatively, the next cell specifies the `calculation` using the abstract `column2`. However, the `withColumn` function of the `test_df` interprets it as a reference to `column2` it contains.

In [None]:
calculation = col("column2") + 8
test_df.withColumn("result", calculation).show()

+-------+-------+------+
|column1|column2|result|
+-------+-------+------+
|      8|     20|    28|
|      9|     43|    51|
|     15|     88|    96|
+-------+-------+------+



## Computations

The dataframe object provides a `withColumn` method to operate with columns. You are supposed to provide:
- The name of the column in which the result should be srored. If the column doesn't exists, it will be created in output dataframe.
- The column object or computational expression that defines the new column.

---

The following cell creates the data frame that we will use for our experiments.

In [28]:
test_df = spark_session.createDataFrame(
    data=[
        (8, "value1"),
        (9, "value2")
    ],
    schema=["numbers", "strings"]
)
test_df.show()

+-------+-------+
|numbers|strings|
+-------+-------+
|      8| value1|
|      9| value2|
+-------+-------+



The following code modifies the example data frame by using `withColumn` function.

In [None]:
test_df.withColumn(
    "numbers",
    col("numbers") + 90
).show()

+-------+-------+
|numbers|strings|
+-------+-------+
|     98| value1|
|     99| value2|
+-------+-------+



## Group by

The data frame contains the `groupBy` method method, which returns a special `GroupedData` object. This object contains a set of tools for building an aggregations over the data:

| Method             | Description                                       |
| ------------------ | ------------------------------------------------- |
| `agg` | General aggregation with one or more expressions. |
| `avg`    | Computes the average of the given columns.        |
| `mean`   | Alias for `avg()`.                                |
| `max`    | Maximum value for each column.                    |
| `min`    | Minimum value for each column.                    |
| `sum`    | Sum of values for each column.                    |
| `count`  | Count of rows for each group.                     |
| `pivot` | Performs a pivot (like SQL `PIVOT`) on the specified column, turning its values into new columns. |
| `applyInPandas` | Apply a function to each group as a Pandas DataFrame and return a new DataFrame.             |
| `apply`         | Apply a user-defined function to each group (returns an RDD, not a DataFrame — less common). |

Check more details in the [`groupby`](spark/groupby.ipynb) page.

---

The following cell defines an example data frame. It constructs and shows the `GroupedData` object based on it.

In [66]:
test_df = spark_session.createDataFrame(
    data=[
        ("a", 3),
        ("a", 2),
        ("c", 4),
        ("c", 7)
    ],
    schema=['group', 'value']
)

grouped_expression = test_df.groupBy('group')
grouped_expression

GroupedData[grouping expressions: [group], value: [group: string, value: bigint], type: GroupBy]

The following code shows how to use the `agg` function to compute the aggregations based on the groups.

In [68]:
from pyspark.sql.functions import sum, avg, min, max
grouped_expression.agg(
    sum('value'),
    avg('value'),
    min('value'),
    max('value')
).show()

+-----+----------+----------+----------+----------+
|group|sum(value)|avg(value)|min(value)|max(value)|
+-----+----------+----------+----------+----------+
|    a|         5|       2.5|         2|         3|
|    c|        11|       5.5|         4|         7|
+-----+----------+----------+----------+----------+

