In [1]:
from __future__ import print_function

from pyspark import SparkContext

from pyspark.mllib.linalg import Matrices, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.random import RandomRDDs

import numpy as np
import math as math

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .appName('Python Spark SQL Chi-square test example') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
# Generate a random double RDD that contains 1 million i.i.d. values drawn from the
# standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
n = 1000000
x = RandomRDDs.normalRDD(sc, n, 10)
stats = x.stats()
stats.count()
# Apply a transform to get a random double RDD following `N(5, 1)`.
mu, sigma = 5, 1 # mean and standard deviation
v = x.map(lambda x: mu + sigma * x)
stats = v.stats()
stats.count()
v_min = v.min()
v_max = v.max()

In [4]:
v_min

0.30445431077755547

In [5]:
v_max

9.890554969102546

In [6]:
# observed relative frequencies of input data
bins_number = 10
obs_counts = v.histogram(bins_number)
observed = [x / n for x in obs_counts[1]]

In [7]:
observed
#type(observed)

[9.1e-05,
 0.00261,
 0.03157,
 0.159722,
 0.344688,
 0.315381,
 0.123871,
 0.020604,
 0.001423,
 4e-05]

In [8]:
# theoretical relative frequencies of expected distribution
h = (v_max - v_min) / bins_number
xi = []

for i in range(0,bins_number): xi.append(obs_counts[0][i] + h / 2 )
#xi   
ti = [(x - mu) / sigma for x in xi]
#ti
phiti = [(1 / math.sqrt( 2*math.pi )) * math.pow( math.e, -math.pow(x,2) / 2 ) for x in ti]
#phiti
ni = [(h * n / sigma) * x for x in phiti]
#ni

In [9]:
# expected relative frequencies (expected distribution) of input data
expected = [x / n for x in ni]
expected
#type(expected)

[5.277082583802485e-05,
 0.0018973673537105403,
 0.027215813081343757,
 0.1557412155290475,
 0.3555479369816186,
 0.3238210844142218,
 0.11765880665212952,
 0.01705517234557427,
 0.0009862798085400004,
 2.2753935814106393e-05]

In [10]:
# Оценим с помощью хи-критерия соответствие распределения
#ChiSqTestResult chiSqTest(Vector observed, Vector expected)
goodnessOfFitTestResult = Statistics.chiSqTest(Vectors.dense(observed), Vectors.dense(expected))

print("%s\n" % goodnessOfFitTestResult)

Chi squared test summary:
method: pearson
degrees of freedom = 9 
statistic = 0.0029182863527482516 
pValue = 0.9999999999999967 
No presumption against null hypothesis: observed follows the same distribution as expected..



In [11]:
 spark.stop()