<a href="https://colab.research.google.com/github/YagyanshB/ApacheSpark/blob/main/pandas_api_sparek_introduction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Importing the required libraries

import numpy as np
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession



# Object Creation:

Creating a pandas-on-Spark Series by passing a list of values, letting pandas API on Spark create a default integer index:

In [2]:
s = ps.Series([1,3,5,np.nan,8,10])

In [3]:
s

0     1.0
1     3.0
2     5.0
3     NaN
4     8.0
5    10.0
dtype: float64

In [4]:
a = pd.Series([1,2,3])

In [5]:
a

Unnamed: 0,0
0,1
1,2
2,3


Creating a pandas-on-Spark DataFrame by passing a dict of objects that can be converted to series-like.



In [6]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

In [8]:
pddf = pd.DataFrame({'a': [1,2,3,4],
                     'b': [100,200,300,400]},
                    index = [1,2,3,4])

In [9]:
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [10]:
pddf

Unnamed: 0,a,b
1,1,100
2,2,200
3,3,300
4,4,400


In [11]:
spark = SparkSession.builder.getOrCreate()

In [13]:
sdf = spark.createDataFrame(pddf)

In [14]:
sdf.show()

+---+---+
|  a|  b|
+---+---+
|  1|100|
|  2|200|
|  3|300|
|  4|400|
+---+---+



In [15]:
pddf.T

Unnamed: 0,1,2,3,4
a,1,2,3,4
b,100,200,300,400


In [19]:
pddf.mean()

Unnamed: 0,0
a,2.5
b,250.0


In [20]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [21]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()

450 ms ± 33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [23]:
# Create a Spark Session

spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

In [24]:
sample_data = [{"name": "John    D.", "age": 30},
  {"name": "Alice   G.", "age": 25},
  {"name": "Bob  T.", "age": 35},
  {"name": "Eve   A.", "age": 28}]

df = spark.createDataFrame(sample_data)

In [27]:
df.show()

+---+----------+
|age|      name|
+---+----------+
| 30|John    D.|
| 25|Alice   G.|
| 35|   Bob  T.|
| 28|  Eve   A.|
+---+----------+



In [28]:
from pyspark.sql.functions import col, regexp_replace

# Remove additional spaces in name
def remove_extra_spaces(df, column_name):
    # Remove extra spaces from the specified column
    df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))

    return df_transformed

transformed_df = remove_extra_spaces(df, "name")

transformed_df.show()

+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+



In [29]:
import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual

# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2)  # pass, DataFrames are identical

In [31]:
import unittest

class PySparkTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()


    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

In [32]:
from pyspark.testing.utils import assertDataFrameEqual

class TestTranformation(PySparkTestCase):
    def test_single_space(self):
        sample_data = [{"name": "John    D.", "age": 30},
                       {"name": "Alice   G.", "age": 25},
                       {"name": "Bob  T.", "age": 35},
                       {"name": "Eve   A.", "age": 28}]

        # Create a Spark DataFrame
        original_df = spark.createDataFrame(sample_data)

        # Apply the transformation function from before
        transformed_df = remove_extra_spaces(original_df, "name")

        expected_data = [{"name": "John D.", "age": 30},
        {"name": "Alice G.", "age": 25},
        {"name": "Bob T.", "age": 35},
        {"name": "Eve A.", "age": 28}]

        expected_df = spark.createDataFrame(expected_data)

        assertDataFrameEqual(transformed_df, expected_df)
