## Pyspark

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.

In [2]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("SimplePySparkExample").getOrCreate()

### DataFrame Creation

- createDataFrame typically by passing a list of lists, tuples, dictionaries and pyspark.sql.Rows, a pandas DataFrame
- createDataFrame takes the schema argument to specify the schema of the DataFrame

In [None]:
# Read a CSV file into a DataFrame
file_path = r"D:\Python\8.Pandas\data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show()

# Perform a basic operation (count the number of rows)
row_count = df.count()
print(f"Number of rows: {row_count}")

### Viewing Data

The top rows of a DataFrame can be displayed using DataFrame.show()

In [None]:
df.show(1)

enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via spark.sql.repl.eagerEval.maxNumRows configuration.

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

In [None]:
# The rows can also be shown vertically. This is useful when rows are too long to show horizontally.
df.show(1, vertical=True) #False

You can see the DataFrame’s schema and column names

In [None]:
df.columns

Show the Summary and Information of the DataFrme

In [None]:
df.printSchema()

In [None]:
df.select('Age','Salary').describe().show()

DataFrame.collect() collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [None]:
df.collect()

In [None]:
# use DataFrame.take() or DataFrame.tail().
df.tail(1)
df.take(2)

PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas API

In [None]:
df.toPandas()

### Selecting and Accessing Data

In [None]:
df.Salary

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

# DataFrame.select() takes the Column instances that returns another DataFrame.
df.select(df.Name).show()

Assign new Column instance.

In [None]:
df.withColumn('Upper_Name',upper(df.Name)).show()

In [None]:
df.show()

To select a subset of rows, use DataFrame.filter().

In [None]:
df.filter(df.Age > 25).show()

In [None]:
# Stop the Spark session
spark.stop()

### Grouping Data

In [None]:
# Read a CSV file into a DataFrame
file_path = r"D:\Python\8.Pandas\data1.csv"
df1 = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df1.show()

Grouping and then applying the avg() function to the resulting groups.

In [None]:
df1.groupby('color').avg().show()
#df1.groupby('color').sum().show()
#df1.groupby('color').count().show()
#df1.groupby('color').min().show()
df1.groupby('color').max().show()
df1.groupby('color').mean().show()

In [None]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_ordered(l, r):
    return pd.merge_ordered(l, r)

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    merge_ordered, schema='time int, id int, v1 double, v2 string').show()