In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.types import StringType, IntegerType, TimestampType, LongType
from pyspark.sql.types import ArrayType, FloatType, BooleanType, DateType

spark = (SparkSession.builder.appName("Data_stats").getOrCreate())

In [None]:
df = spark.read.csv("file.csv")

In [None]:
# separate list of column according to datatypes

string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    
numeric_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, LongType) or isinstance(field.dataType, IntegerType) or isinstance(field.dataType, DecimalType)]
    
date_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, TimestampType)]

# Find the data types for all columns
col_dtype_list = [dict(df.dtypes)]

dtype_df = spark.createDataFrame(col_dtype_list)
dtype_df = dtype_df.select(lit("data_types").alias("summary"), "*")

# Count of Null values for all columns

null_df = df.select([count(when(col(c).isNull(), c)).alias(c).cast("string") for c in df.columns]) 
null_df = null_df.select(lit("number_of_null_values").alias("summary"), "*")

# Row count for all columns

row_count_df = df.select([count(when(col(c).isNull() | col(c).isNotNull(), c)).alias(c).cast("string") for c in df.columns])
row_count_df = row_count_df.select(lit("row_count").alias("summary"), "*")

# Percentage of null values

null_percentage_df = null_df.join(row_count_df).select([round(null_df[c]*100 / row_count_df[c]).alias(c).cast("string") for c in null_df.columns[1:]])
null_percentage_df = null_percentage_df.select(lit("percentage_of_null_values").alias("summary"), "*")

# Numeric_columns metrics

numeric_df = df.select(*numeric_columns).summary("min", "max") # max and min values across columns

# String_columns metrics

string_df_1 = df.select([max(length(c)).alias(c) for c in string_columns]) # max length
string_df_1 = string_df_1.select(lit("max_length").alias("summary"), "*")

string_df_2 = df.select([min(length(c)).alias(c) for c in string_columns]) # min length
string_df_2 = string_df_2.select(lit("min_length").alias("summary"), "*")

string_df_3 = df.select([countDistinct(c).alias(c) for c in string_columns]) # count of categorical values
string_df_3 = string_df_3.select(lit("number_of_categorical_values").alias("summary"), "*")

string_df = string_df_1.union(string_df_2).union(string_df_3)
    
# Date_columns metrics

date_df_1 = df.select([max(c).alias(c).cast("string") for c in date_columns]) # max date
date_df_1 = date_df_1.select(lit("max_date").alias("summary"), "*")
    
date_df_2 = df.select([min(c).alias(c).cast("string") for c in date_columns]) # min date
date_df_2 = date_df_2.select(lit("min_date").alias("summary"), "*")

date_df = date_df_1.union(date_df_2)

# Add missing columns for string, date, numeric df's - to make all them available for union

for column in [column for column in dtype_df.columns if column not in string_df.columns]:
    string_df = string_df.withColumn(column, lit(None).cast("string"))

for column in [column for column in dtype_df.columns if column not in numeric_df.columns]:
    numeric_df = numeric_df.withColumn(column, lit(None).cast("string"))

for column in [column for column in dtype_df.columns if column not in date_df.columns]:
    date_df = date_df.withColumn(column, lit(None).cast("string"))

# Combine all metrics

final_df = dtype_df.unionByName(row_count_df).unionByName(null_df).unionByName(null_percentage_df).unionByName(numeric_df).unionByName(string_df).unionByName(date_df)

return final_df