In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()
!pip install openpyxl

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/19 06:42:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Defaulting to user installation because normal site-packages is not writeable


In [3]:
import pandas as pd
from pyspark.sql.functions import col, count, isnan, when, mean as _mean, stddev as _stddev

# get data
total_data = 'world-surface.xlsx'
developed_data = 'Developed.xlsx'
developing_data = 'Developing.xlsx'

df_pandas = pd.read_excel(total_data)
df_developed_pandas = pd.read_excel(developed_data)
df_developing_pandas = pd.read_excel(developing_data)

# pre
df_pandas['Country'] = df_pandas['Country'].astype(str)

# convert pandas DataFrame into Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)
df_developed_spark = spark.createDataFrame(df_developed_pandas)
df_developing_spark = spark.createDataFrame(df_developing_pandas)

# explore data
df_spark.printSchema()
df_spark.describe().show()
df_developed_spark.describe().show()
df_developing_spark.describe().show()

# null
null_counts = df_spark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_spark.columns])

# total
total_count = df_spark.count()

# couunt of non-null
non_null_counts = null_counts.select([(total_count - col(c)).alias(c) for c in null_counts.columns])

# type
data_types = df_spark.dtypes

# null count table
summary_table = non_null_counts.toPandas().transpose().reset_index()
summary_table.columns = ['Column', 'Non-Null Count']
null_counts_pd = null_counts.toPandas().transpose().reset_index()
null_counts_pd.columns = ['Column', 'Null Count']
summary_table = summary_table.merge(null_counts_pd, on='Column')
summary_table['Data Type'] = [dtype for _, dtype in data_types]

print(summary_table)

root
 |-- Country: string (nullable = true)
 |-- GDP: double (nullable = true)
 |-- Gross primary education enrollment (%): double (nullable = true)
 |-- Gross tertiary education enrollment (%): double (nullable = true)
 |-- Out of pocket health expenditure: double (nullable = true)
 |-- Physicians per thousand: double (nullable = true)
 |-- Density\n(P/Km2): double (nullable = true)
 |-- Fertility Rate: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- Urban_population: double (nullable = true)
 |-- Unemployment rate: double (nullable = true)
 |-- Population: Labor force participation (%): double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Tax revenue (%): double (nullable = true)
 |-- Total tax rate: double (nullable = true)

+-------+-----------+-----------+--------------------------------------+---------------------------------------+--------------------------------+-----------------------+----------------+--------------+----------+--------------

                                       Column  Non-Null Count  Null Count  \
0                                     Country             199           1   
1                                         GDP             197           3   
2      Gross primary education enrollment (%)             192           8   
3     Gross tertiary education enrollment (%)             187          13   
4            Out of pocket health expenditure             192           8   
5                     Physicians per thousand             192           8   
6                            Density\n(P/Km2)             199           1   
7                              Fertility Rate             192           8   
8                                  Population             198           2   
9                            Urban_population             194           6   
10                          Unemployment rate             180          20   
11  Population: Labor force participation (%)             180          20   

In [4]:
# 定义函数来检查数据
def audit_data(df_spark):
    # 创建一个列表来存储结果
    result = []

    # 获取数据框架的总行数
    total_count = df_spark.count()

    # 检查每一列
    for column, dtype in df_spark.dtypes:
        if dtype in ['int', 'double']:
            # 计算极值
            quantiles = df_spark.approxQuantile(column, [0.05, 0.95], 0.0)
            q05, q95 = quantiles
            extreme_value = df_spark.filter((col(column) > q95) | (col(column) < q05)).count()
            
            # 计算离群值
            stats = df_spark.select(_mean(col(column)).alias('mean'), _stddev(col(column)).alias('stddev')).first()
            mean_val = stats['mean']
            stddev_val = stats['stddev']
            outlier = df_spark.filter((col(column) > mean_val + 3 * stddev_val) | (col(column) < mean_val - 3 * stddev_val)).count()
        else:
            extreme_value = None
            outlier = None

        # 计算空值数量
        null_value = df_spark.filter(col(column).isNull()).count()

        # 计算空字符串和空白字符串的数量
        if dtype == 'string':
            empty_string = df_spark.filter(col(column) == '').count()
            white_space = df_spark.filter(col(column) == ' ').count()
        else:
            empty_string = None
            white_space = None

        # 将结果添加到列表中
        result.append([column, extreme_value, outlier, null_value, empty_string, white_space])

    # 将列表转换为pandas DataFrame
    result_df = pd.DataFrame(result, columns=['Field', 'Extreme Value', 'Outlier', 'Null Value', 'Empty String', 'White Space'])
    return result_df

# 运行数据审计函数
audit_result = audit_data(df_spark)

# 显示结果
print(audit_result)

                                        Field  Extreme Value  Outlier  \
0                                     Country            NaN      NaN   
1                                         GDP           21.0    197.0   
2      Gross primary education enrollment (%)           26.0    192.0   
3     Gross tertiary education enrollment (%)           30.0    187.0   
4            Out of pocket health expenditure           26.0    192.0   
5                     Physicians per thousand           25.0    192.0   
6                            Density\n(P/Km2)           15.0    199.0   
7                              Fertility Rate           26.0    192.0   
8                                  Population           20.0    198.0   
9                            Urban_population           24.0    194.0   
10                          Unemployment rate           37.0    180.0   
11  Population: Labor force participation (%)           37.0    180.0   
12                                        CPI      