# Basics Structure Operations

Definitionally, a DataFrame consists of a series of records (like rows in a table), that are of type
Row, and a number of columns (like columns in a spreadsheet) that represent a computation
expression that can be performed on each individual record in the Dataset. Schemas define the
name as well as the type of data in each column. Partitioning of the DataFrame defines the
layout of the DataFrame or Dataset’s physical distribution across the cluster. The partitioning
scheme defines how that is allocated. You can set this to be based on values in a certain column
or nondeterministically.

In [3]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

In [4]:
# Read CSV data into a DataFrame
df = spark.read.csv("/home/blackheart/Documents/Data/Apache-Spark/Data/flight_data/2015-summary.csv", header=True, inferSchema=True)
df.show()

                                                                                

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

## Schemas
**A schema defines the column names and types of a DataFrame. We can either let a data source
define the schema (called schema-on-read) or we can define it explicitly ourselves.**

In [5]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



### Creating Own Schemas

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})])
df = spark.read.format("json").schema(myManualSchema)\
.load("/data/flight-data/json/2015-summary.json")

In PySpark, you can create your own schemas when working with DataFrames. A schema defines the structure of your data, specifying the names and data types of columns. Here's how you can create your own schema:

In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,LongType



# Define your own schema
custom_schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),    # True means the column can have null values
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), True)
])

# Read data with the custom schema
# df = spark.read.csv("path/to/your/file.csv", header=True, schema=custom_schema)
df = spark.read.csv("/home/blackheart/Documents/Data/Apache-Spark/Data/flight_data/2015-summary.csv", header=True, schema=custom_schema)


# Show the DataFrame
df.show()



+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

# Columns and Expression

In PySpark, columns and expressions play a crucial role in manipulating and transforming data within DataFrames. A column represents a single column of data in a DataFrame, and expressions are operations or transformations applied to columns. Here's an explanation with examples:

### Columns:

- **Creating Columns:**
  - Columns can be created in several ways, including by referencing existing columns or applying operations on them.

  ```python
  from pyspark.sql import SparkSession
  from pyspark.sql.functions import col

  # Create a Spark session
  spark = SparkSession.builder.appName("example").getOrCreate()

  # Read data into a DataFrame
  df = spark.read.csv("path/to/your/file.csv", header=True, inferSchema=True)

  # Reference an existing column
  name_col = df["name"]

  # Create a new column using an operation
  age_squared_col = df["age"] ** 2

  # Using the `col` function
  city_col = col("city")
  ```

### Expressions:

- **Defining Expressions:**
  - Expressions represent transformations or operations on columns. PySpark provides a rich set of built-in functions that can be used in expressions.

  ```python
  from pyspark.sql.functions import expr

  # Using expressions
  expr_expr = expr("age * 2")
  ```

### Example Workflow:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Read data into a DataFrame
df = spark.read.csv("path/to/your/file.csv", header=True, inferSchema=True)

# Columns
name_col = df["name"]
age_squared_col = df["age"] ** 2
city_col = col("city")

# Expressions
expr_expr = expr("age * 2")

# Apply transformations using columns and expressions
result_df = df.select(name_col, age_squared_col.alias("age_squared"), city_col, expr_expr.alias("double_age"))

# Show the result
result_df.show()

# Stop the Spark session when done
spark.stop()
```

In this example:

- Columns (`name_col`, `age_squared_col`, `city_col`) are created by referencing existing columns or applying operations.
- Expressions (`expr_expr`) are created using the `expr` function.
- Transformations are applied to the DataFrame using `select` and aliases are used to rename the resulting columns.
- The `show` action is used to display the transformed DataFrame.

Understanding columns and expressions is crucial for performing a wide range of data transformations and manipulations in PySpark. They allow you to create complex operations that can be efficiently executed on large-scale distributed datasets.

## Creating Rows
You can create rows by manually instantiating a Row object with the values that belong in each
column. It’s important to note that only DataFrames have schemas. Rows themselves do not have
schemas. This means that if you create a Row manually, you must specify the values in the same
order as the schema of the DataFrame to which they might be appended (we will see this when
we discuss creating DataFrames):

In [10]:
# in Python
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [11]:
myRow[0]
myRow[2]

1

# Creating DataFrame

In [12]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

                                                                                

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|NULL|    1|
+-----+----+-----+



                                                                                

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType



# Sample data
data = [("Alice", 28, "New York"),
        ("Bob", 35, "San Francisco"),
        ("Charlie", 22, "Los Angeles")]

# Define the schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show()


+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
+-------+---+-------------+



## select and selectExpr
select and selectExpr allow you to do the DataFrame equivalent of SQL queries on a table of
data:

In [16]:
# Use select to choose specific columns
selected_df = df.select("name", "age")
selected_df.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 28|
|    Bob| 35|
|Charlie| 22|
+-------+---+



In [17]:
# Use selectExpr for more complex transformations
transformed_df = df.selectExpr("name", "age", "age * 2 as double_age", "UPPER(city) as upper_city")
transformed_df.show()

+-------+---+----------+-------------+
|   name|age|double_age|   upper_city|
+-------+---+----------+-------------+
|  Alice| 28|        56|     NEW YORK|
|    Bob| 35|        70|SAN FRANCISCO|
|Charlie| 22|        44|  LOS ANGELES|
+-------+---+----------+-------------+



                                                                                

In this example:

* We use the select function to choose specific columns ("name" and "age") from the original DataFrame.
* We use the selectExpr function for more complex transformations, such as creating a new column "double_age" and applying the UPPER function to the "city" column.
* The show method is used to display the selected and transformed DataFrames.

# Converting to Spark Types

When you create a DataFrame in PySpark, the data is automatically converted to Spark types. However, if you want to explicitly convert columns to specific Spark types, you can use the `withColumn` method along with casting functions from `pyspark.sql.functions`. Here's an example:

```python
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", "28", "New York"),
        ("Bob", "35", "San Francisco"),
        ("Charlie", "22", "Los Angeles")]

# Define the schema
schema = ["name", "age", "city"]

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Convert "age" column to IntegerType
df = df.withColumn("age", df["age"].cast(IntegerType()))

# Convert "name" and "city" columns to StringType
df = df.withColumn("name", df["name"].cast(StringType()))
df = df.withColumn("city", df["city"].cast(StringType()))

# Show the DataFrame with converted types
print("DataFrame with Converted Types:")
df.printSchema()
df.show()

# Stop the Spark session when done
spark.stop()
```

In this example:

- We use the `cast` method to explicitly convert the "age" column to `IntegerType`.
- We also use `cast` to convert the "name" and "city" columns to `StringType`.
- The `printSchema` method is used to display the schema of the DataFrame after the type conversions.
- The `show` method is used to display the content of the DataFrame after the type conversions.

Keep in mind that Spark will automatically infer the data types when you create a DataFrame. Explicit type conversions are necessary when you want to ensure that the columns have the correct Spark types.

# Adding Column

In [29]:
from pyspark.sql.functions import lit
# df.select(expr("*"), lit(1).alias("One")).show(2)
df.withColumn("numberOne", lit(1)).show(2)

+-----+---+-------------+---------+
| name|age|         city|numberOne|
+-----+---+-------------+---------+
|Alice| 28|     New York|        1|
|  Bob| 35|San Francisco|        1|
+-----+---+-------------+---------+
only showing top 2 rows



# Renaming Column

In [30]:
df.withColumnRenamed("numberOne", "Loser").columns

['name', 'age', 'city']

In [31]:
df.show()

+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
+-------+---+-------------+



# Reserved Characters and Keywords
One thing that you might come across is reserved characters like spaces or dashes in column
names. Handling these means escaping column names appropriately. In Spark, we do this by
using backtick (`) characters. Let’s use withColumn, which you just learned about to create a
column with reserved characters. We’ll show two examples—in the one shown here, we don’t
need escape characters, but in the next one, we do:

In [32]:
from pyspark.sql.functions import expr, col, column
dfWithLongColName = df.withColumn(
"name",
expr("city"))

# Case Sensitivity
By default Spark is case insensitive; however, you can make Spark case sensitive by setting the
configuration:
-- in SQL
```set spark.sql.caseSensitive true```

# Removing Columns
Now that we’ve created this column, let’s take a look at how we can remove columns from
DataFrames. You likely already noticed that we can do this by using select. However, there is
also a dedicated method called drop:


In [None]:
df.drop("ORIGIN_COUNTRY_NAME").columns
# We can drop multiple columns by passing in multiple columns as arguments:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

# Changing a Column’s Type (cast)
Sometimes, we might need to convert from one type to another; for example, if we have a set of
StringType that should be integers. We can convert columns from one type to another by
casting the column from one type to another. For instance, let’s convert our count column from
an integer to a type Long:

```df.withColumn("count2", col("count").cast("long"))```

# Filtering Rows
To filter rows, we create an expression that evaluates to true or false. You then filter out the rows
with an expression that is equal to false. The most common way to do this with DataFrames is to
create either an expression as a String or build an expression by using a set of column
manipulations. There are two methods to perform this operation: you can use where or filter
and they both will perform the same operation and accept the same argument types when used
with DataFrames. We will stick to where because of its familiarity to SQL; however, filter is
valid as well.

In [34]:
df.filter(col("age") > 2).show(2)
df.where("age < 2").show(2)

+-----+---+-------------+
| name|age|         city|
+-----+---+-------------+
|Alice| 28|     New York|
|  Bob| 35|San Francisco|
+-----+---+-------------+
only showing top 2 rows

+----+---+----+
|name|age|city|
+----+---+----+
+----+---+----+



# Getting Unique Rows
A very common use case is to extract the unique or distinct values in a DataFrame. These values
can be in one or more columns. The way we do this is by using the distinct method on a
DataFrame, which allows us to deduplicate any rows that are in that DataFrame. For instance,
let’s get the unique origins in our dataset. This, of course, is a transformation that will return a
new DataFrame with only unique rows:

In [35]:
df.select("name", "age").distinct().count()

                                                                                

3

# Random Samples
Sometimes, you might just want to sample some random records from your DataFrame. You can
do this by using the sample method on a DataFrame, which makes it possible for you to specify
a fraction of rows to extract from a DataFrame and whether you’d like to sample with or without
replacement:

In [36]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

                                                                                

3

# Random Splits
Random splits can be helpful when you need to break up your DataFrame into a random “splits”
of the original DataFrame. This is often used with machine learning algorithms to create training,
validation, and test sets. In this next example, we’ll split our DataFrame into two different
DataFrames by setting the weights by which we will split the DataFrame (these are the
arguments to the function). Because this method is designed to be randomized, we will also
specify a seed (just replace seed with a number of your choosing in the code block). It’s
important to note that if you don’t specify a proportion for each DataFrame that adds up to one,
they will be normalized so that they do:

In [37]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

                                                                                

False

# Concatenating and Appending Rows (Union) & Sorting Rows

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data1 = [("Alice", 28, "New York"),
         ("Bob", 35, "San Francisco"),
         ("Charlie", 22, "Los Angeles")]

data2 = [("David", 30, "Chicago"),
         ("Eva", 40, "Boston"),
         ("Frank", 25, "Seattle")]

# Define the schema
schema = ["name", "age", "city"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema=schema)
df2 = spark.createDataFrame(data2, schema=schema)

# Show the original DataFrames
print("DataFrame 1:")
df1.show()

print("DataFrame 2:")
df2.show()

# Concatenate or append rows using union
concatenated_df = df1.union(df2)

# Show the concatenated DataFrame
print("Concatenated DataFrame:")
concatenated_df.show()

# Sort rows by the "age" column in ascending order
sorted_df = concatenated_df.orderBy("age")

# Show the sorted DataFrame
print("Sorted DataFrame:")
sorted_df.show()




DataFrame 1:


                                                                                

+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
+-------+---+-------------+

DataFrame 2:
+-----+---+-------+
| name|age|   city|
+-----+---+-------+
|David| 30|Chicago|
|  Eva| 40| Boston|
|Frank| 25|Seattle|
+-----+---+-------+

Concatenated DataFrame:
+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
|  David| 30|      Chicago|
|    Eva| 40|       Boston|
|  Frank| 25|      Seattle|
+-------+---+-------------+

Sorted DataFrame:




+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|Charlie| 22|  Los Angeles|
|  Frank| 25|      Seattle|
|  Alice| 28|     New York|
|  David| 30|      Chicago|
|    Bob| 35|San Francisco|
|    Eva| 40|       Boston|
+-------+---+-------------+



                                                                                

# Limit
Oftentimes, you might want to restrict what you extract from a DataFrame; for example, you
might want just the top ten of some DataFrame. You can do this by using the limit method:

In [39]:
df.limit(5).show()



+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
+-------+---+-------------+



                                                                                

# Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some frequently
filtered columns, which control the physical layout of data across the cluster including the
partitioning scheme and the number of partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This
means that you should typically only repartition when the future number of partitions is greater
than your current number of partitions or when you are looking to partition by a set of columns:

In [40]:
df.rdd.getNumPartitions() 

4

In [41]:
df.repartition(5)

DataFrame[name: string, age: int, city: string]

# Collecting Rows to the Driver
As discussed in previous chapters, Spark maintains the state of the cluster in the driver. There are
times when you’ll want to collect some of your data to the driver in order to manipulate it on
your local machine.
Thus far, we did not explicitly define this operation. However, we used several different methods
for doing so that are effectively all the same. collect gets all data from the entire DataFrame,
take selects the first N rows, and show prints out a number of rows nicely.

In [42]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 28|     New York|
|    Bob| 35|San Francisco|
|Charlie| 22|  Los Angeles|
+-------+---+-------------+

+-------+---+-------------+
|name   |age|city         |
+-------+---+-------------+
|Alice  |28 |New York     |
|Bob    |35 |San Francisco|
|Charlie|22 |Los Angeles  |
+-------+---+-------------+



[Row(name='Alice', age=28, city='New York'),
 Row(name='Bob', age=35, city='San Francisco'),
 Row(name='Charlie', age=22, city='Los Angeles')]

In [43]:
collectDF.toLocalIterator()

<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f70042f0ba0>

# **Thank You!**