# **PySpark Utility Suite**

In [1]:
# Installation
!pip install pyspark



In [4]:
# Import and session init
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

In [None]:
# Create a custom DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType(
    [StructField('id', IntegerType(), False),
     StructField('name', StringType(), True)])

data = [(1, 'Nome'), (2, None)]

my_df = spark.createDataFrame(data, schema)
my_df.show()

+---+----+
| id|name|
+---+----+
|  1|Vito|
|  2|NULL|
+---+----+



In [9]:
# Read a CSV File
df = spark.read.csv('/content/sample_data/california_housing_train.csv')

df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|76

In [None]:
# Testing With GreatExpectations
!pip install great_expectations
#!pip install --upgrade great_expectations



In [28]:
import great_expectations as gx

# Retrieve your Data Context
context = gx.get_context()

# Define the Data Source name
data_source_name = "my_data_source"

# Add the Data Source to the Data Context
data_source = context.data_sources.add_spark(name=data_source_name)
# Define the Data Asset name
data_asset_name = "my_dataframe_data_asset"

# Add a Data Asset to the Data Source
data_asset = data_source.add_dataframe_asset(name=data_asset_name)

batch_parameters = {"dataframe": my_df}

# Retrieve the dataframe Batch Definition
batch_definition_name = "my_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)

batch_definition = (
    context.data_sources.get(data_source_name)
    .get_asset(data_asset_name)
    .get_batch_definition(batch_definition_name)
)

# Create an Expectation to test
# expectation = gx.expectations.ExpectColumnToExist(column='id')

expectation_list = [gx.expectations.ExpectColumnToExist(column='id'),
                gx.expectations.ExpectColumnValuesToNotBeNull(column='name'),
                gx.expectations.ExpectColumnValuesToBeUnique(column='id')]

# Get the dataframe as a Batch
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

# Test the Expectation
for expectation in expectation_list:
  # Validate the Batch
  validation_results = batch.validate(expectation)
  print(validation_results)



INFO:great_expectations.data_context.types.base:Created temporary directory '/tmp/tmp_yf3a04r' for ephemeral docs site


Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

{
  "success": true,
  "expectation_config": {
    "type": "expect_column_to_exist",
    "kwargs": {
      "batch_id": "my_data_source-my_dataframe_data_asset",
      "column": "id"
    },
    "meta": {}
  },
  "result": {},
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

{
  "success": false,
  "expectation_config": {
    "type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "batch_id": "my_data_source-my_dataframe_data_asset",
      "column": "name"
    },
    "meta": {}
  },
  "result": {
    "element_count": 2,
    "unexpected_count": 1,
    "unexpected_percent": 50.0,
    "partial_unexpected_list": [
      null
    ],
    "partial_unexpected_counts": [
      {
        "value": null,
        "count": 1
      }
    ]
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


Calculating Metrics:   0%|          | 0/12 [00:00<?, ?it/s]

{
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_to_be_unique",
    "kwargs": {
      "batch_id": "my_data_source-my_dataframe_data_asset",
      "column": "id"
    },
    "meta": {}
  },
  "result": {
    "element_count": 2,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_counts": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}
