In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 50.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=796d5fe21fed7895a989bda1fcd0d5ae53e87b21c1c59c28600cecfb50dd2ca2
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
from pyspark.sql import *
spark = SparkSession.builder.appName('Sampling').getOrCreate()

In [4]:
df = spark.read.options(header= True, inferSchema = True).csv("us-counties.txt")

In [5]:
df.count()

758243

In [6]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)



In [7]:
df.first()

Row(date=datetime.datetime(2020, 1, 21, 0, 0), county='Snohomish', state='Washington', fips=53061, cases=1, deaths=0)

In [8]:
#Fixed Sampling
df.sample(.00001).show()

+-------------------+--------------------+--------------+-----+-----+------+
|               date|              county|         state| fips|cases|deaths|
+-------------------+--------------------+--------------+-----+-----+------+
|2020-04-26 00:00:00|              Oldham|         Texas|48359|    3|     1|
|2020-05-09 00:00:00|              Marion|      Missouri|29127|    5|     0|
|2020-05-11 00:00:00|               Union|       Florida|12125|    7|     0|
|2020-05-12 00:00:00|           Christian|      Illinois|17021|   30|     5|
|2020-07-02 00:00:00|          Tippecanoe|       Indiana|18157|  662|     9|
|2020-08-23 00:00:00|               Delta|      Colorado| 8029|  140|     1|
|2020-09-10 00:00:00|             Unknown|     Louisiana| null| 1126|   170|
|2020-09-29 00:00:00|              Warren|          Iowa|19181|  969|     6|
|2020-10-26 00:00:00|                Dare|North Carolina|37055|  367|     3|
|2020-11-04 00:00:00|Skagway Municipality|        Alaska| 2230|    7|     0|

In [9]:
#Fixed Sampling with seeding
df.sample(.00001,3455).show()

+-------------------+--------------------+--------------+-----+------+------+
|               date|              county|         state| fips| cases|deaths|
+-------------------+--------------------+--------------+-----+------+------+
|2020-04-18 00:00:00|            Kittitas|    Washington|53037|    14|     0|
|2020-07-11 00:00:00|                Reno|        Kansas|20155|   123|     0|
|2020-08-09 00:00:00|              Oconee|South Carolina|45073|   827|     9|
|2020-09-16 00:00:00|              Cassia|         Idaho|16031|   616|     5|
|2020-09-25 00:00:00|           Chattooga|       Georgia|13055|   757|    18|
|2020-10-02 00:00:00|              Kimble|         Texas|48267|    16|     1|
|2020-10-29 00:00:00|            Maricopa|       Arizona| 4013|156736|  3566|
|2020-11-16 00:00:00|             Kimball|      Nebraska|31105|   134|     0|
|2020-11-21 00:00:00|Prince of Wales-H...|        Alaska| 2198|    44|     1|
+-------------------+--------------------+--------------+-----+-

In [10]:
df.sample(.00001,3455).show()

+-------------------+--------------------+--------------+-----+------+------+
|               date|              county|         state| fips| cases|deaths|
+-------------------+--------------------+--------------+-----+------+------+
|2020-04-18 00:00:00|            Kittitas|    Washington|53037|    14|     0|
|2020-07-11 00:00:00|                Reno|        Kansas|20155|   123|     0|
|2020-08-09 00:00:00|              Oconee|South Carolina|45073|   827|     9|
|2020-09-16 00:00:00|              Cassia|         Idaho|16031|   616|     5|
|2020-09-25 00:00:00|           Chattooga|       Georgia|13055|   757|    18|
|2020-10-02 00:00:00|              Kimble|         Texas|48267|    16|     1|
|2020-10-29 00:00:00|            Maricopa|       Arizona| 4013|156736|  3566|
|2020-11-16 00:00:00|             Kimball|      Nebraska|31105|   134|     0|
|2020-11-21 00:00:00|Prince of Wales-H...|        Alaska| 2198|    44|     1|
+-------------------+--------------------+--------------+-----+-

In [11]:
#Sampling with replacements
df.select("state").sample(True, .00001, seed = 1).show()
#Maryland is repeated twice

+------------+
|       state|
+------------+
|   Wisconsin|
|     Vermont|
|       Texas|
|   Louisiana|
|      Kansas|
|Pennsylvania|
|  California|
|    Arkansas|
|    Maryland|
|     Indiana|
| Connecticut|
|    Maryland|
+------------+



In [12]:
#Sampling without replacements
df.select("state").sample(False, .00001, seed = 1).show()

+---------+
|    state|
+---------+
| Maryland|
|Wisconsin|
|    Texas|
| Illinois|
+---------+



In [13]:
#Reducing skew in Sampling
from pyspark.sql.functions import desc

In [14]:
df.groupBy("state").count().alias("cnt").sort(desc("cnt.count")).show()

+--------------+-----+
|         state|count|
+--------------+-----+
|         Texas|57096|
|       Georgia|38969|
|      Virginia|31699|
|      Kentucky|28132|
|      Missouri|26770|
|      Illinois|24258|
|North Carolina|24108|
|          Iowa|23370|
|     Tennessee|23167|
|        Kansas|22727|
|       Indiana|22482|
|          Ohio|21410|
|     Minnesota|20627|
|      Michigan|20094|
|   Mississippi|20056|
|      Nebraska|18972|
|      Arkansas|18220|
|      Oklahoma|18038|
|     Wisconsin|17327|
|       Florida|16846|
+--------------+-----+
only showing top 20 rows



In [15]:
df1 = df.groupBy("state").count().alias("cnt").sort(desc("cnt.count"))

In [16]:
from pyspark.sql.functions import lit, when, col

In [17]:
df2 = df1.withColumn("samplingratio", when(col("count") > (16000), lit(0.00001)).otherwise(lit(0.0001)))

In [19]:
fractions = df2.select("state", "samplingratio").rdd.collectAsMap()

In [20]:
print(fractions)

{'Texas': 1e-05, 'Georgia': 1e-05, 'Virginia': 1e-05, 'Kentucky': 1e-05, 'Missouri': 1e-05, 'Illinois': 1e-05, 'North Carolina': 1e-05, 'Iowa': 1e-05, 'Tennessee': 1e-05, 'Kansas': 1e-05, 'Indiana': 1e-05, 'Ohio': 1e-05, 'Minnesota': 1e-05, 'Michigan': 1e-05, 'Mississippi': 1e-05, 'Nebraska': 1e-05, 'Arkansas': 1e-05, 'Oklahoma': 1e-05, 'Wisconsin': 1e-05, 'Florida': 1e-05, 'Pennsylvania': 1e-05, 'Alabama': 1e-05, 'Louisiana': 0.0001, 'Puerto Rico': 0.0001, 'Colorado': 0.0001, 'New York': 0.0001, 'California': 0.0001, 'South Dakota': 0.0001, 'West Virginia': 0.0001, 'North Dakota': 0.0001, 'South Carolina': 0.0001, 'Montana': 0.0001, 'Washington': 0.0001, 'Idaho': 0.0001, 'Oregon': 0.0001, 'New Mexico': 0.0001, 'Utah': 0.0001, 'Maryland': 0.0001, 'New Jersey': 0.0001, 'Alaska': 0.0001, 'Wyoming': 0.0001, 'Maine': 0.0001, 'Massachusetts': 0.0001, 'Arizona': 0.0001, 'Vermont': 0.0001, 'Nevada': 0.0001, 'New Hampshire': 0.0001, 'Connecticut': 0.0001, 'Rhode Island': 0.0001, 'Hawaii': 0.00

In [21]:
sampledDF = df.sampleBy("state", fractions,1)
sampledDF.show(100)

+-------------------+----------+--------------+-----+-----+------+
|               date|    county|         state| fips|cases|deaths|
+-------------------+----------+--------------+-----+-----+------+
|2020-03-11 00:00:00|   Norfolk| Massachusetts|25021|   22|     0|
|2020-04-22 00:00:00|San Miguel|      Colorado| 8113|   17|     0|
|2020-04-26 00:00:00|St. Mary's|      Maryland|24037|  133|     6|
|2020-04-29 00:00:00|   Harford|      Maryland|24025|  378|    13|
|2020-05-14 00:00:00| Baltimore|      Maryland|24005| 4306|   223|
|2020-05-20 00:00:00| Anchorage|        Alaska| 2020|  207|     4|
|2020-05-22 00:00:00|     Delta|      Colorado| 8029|   61|     1|
|2020-05-24 00:00:00| Riverside|    California| 6065| 6464|   299|
|2020-05-31 00:00:00|  Umatilla|        Oregon|41059|  116|     3|
|2020-06-16 00:00:00|     Grand|          Utah|49019|   12|     0|
|2020-06-16 00:00:00| Marquette|     Wisconsin|55077|    9|     1|
|2020-06-27 00:00:00|    Skagit|    Washington|53057|  502|   