In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Legislators") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/home/jovyan/jars/postgresql-42.7.2.jar") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/sql_book_o_reilly"
properties = {
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

df_legislators = spark.read.jdbc(url=jdbc_url, table="legislators", properties=properties)
df_legislators_terms = spark.read.jdbc(url=jdbc_url, table="legislators_terms", properties=properties)

In [2]:
df_legislators.createOrReplaceTempView("legislators")
df_legislators_terms.createOrReplaceTempView("legislators_terms")

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [4]:
df_legislators.show(3)

+------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+-----------+----------------------+-----------+--------+--------------+-----------+-------------------+----------------+--------------------------+---------+--------+------------+------+--------------+--------------+---------+---------+--------+
|         full_name|first_name|last_name|middle_name|nickname|suffix|other_names_end|other_names_middle|other_names_last|  birthday|gender|id_bioguide|id_bioguide_previous_0|id_govtrack|id_icpsr|  id_wikipedia|id_wikidata|id_google_entity_id|id_house_history|id_house_history_alternate|id_thomas|id_cspan|id_votesmart|id_lis|id_ballotpedia|id_opensecrets| id_fec_0| id_fec_1|id_fec_2|
+------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+-----------+----------------------+-----------+--------+--------------+-----------+--------

In [5]:
df_legislators_terms.show(3)

+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+-------+-----+----+------------+------+----------+-------+------+
|id_bioguide|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|address|phone| fax|contact_form|office|state_rank|rss_url|caucus|
+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+-------+-----+----+------------+------+----------+-------+------+
|    B000944|          0|B000944-0|      rep|1993-01-05|1995-01-03|   OH|      13| NULL|Democrat|NULL|NULL|   NULL| NULL|NULL|        NULL|  NULL|      NULL|   NULL|  NULL|
|    C000127|          0|C000127-0|      rep|1993-01-05|1995-01-03|   WA|       1| NULL|Democrat|NULL|NULL|   NULL| NULL|NULL|        NULL|  NULL|      NULL|   NULL|  NULL|
|    C000141|          0|C000141-0|      rep|1987-01-06|1989-01-03|   MD|       3| NULL|Democrat|NULL|NULL|   NULL| NULL|NULL|        N

### First term of each legislator

In [6]:
df_legislators_terms.groupBy('id_bioguide').agg(F.min('term_start').alias('first_term')).show(3)

+-----------+----------+
|id_bioguide|first_term|
+-----------+----------+
|    M001163|2005-03-10|
|    P000604|2012-11-15|
|    C001115|2018-07-10|
+-----------+----------+
only showing top 3 rows



In [7]:
spark.sql("SELECT " \
" id_bioguide, " \
"min(term_start) as first_term " \
"FROM legislators_terms " \
"GROUP BY 1").show(3)

+-----------+----------+
|id_bioguide|first_term|
+-----------+----------+
|    M001163|2005-03-10|
|    P000604|2012-11-15|
|    C001115|2018-07-10|
+-----------+----------+
only showing top 3 rows



### Cohort retained

In [8]:
first_terms = df_legislators_terms.groupBy('id_bioguide').agg(F.min('term_start').alias('first_term'))

joined = (first_terms.alias("a").join(df_legislators_terms.alias("b"), F.col("a.id_bioguide") == F.col("b.id_bioguide"), "inner"))

joined = joined.withColumn(
    "period",
    F.floor(F.months_between(F.col("b.term_start"), F.col("a.first_term")) / 12)
)

In [9]:
first_terms.show(3)

+-----------+----------+
|id_bioguide|first_term|
+-----------+----------+
|    M001163|2005-03-10|
|    P000604|2012-11-15|
|    C001115|2018-07-10|
+-----------+----------+
only showing top 3 rows



In [10]:
joined.show(3)

+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+------+
|id_bioguide|first_term|id_bioguide|term_number|  term_id|term_type|term_start|  term_end|state|district|class|     party| how|                 url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|period|
+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+------+
|    M001163|2005-03-10|    M001163|          0|M001163-0|      rep|2005-03-10|2007-01-03|   CA|       5| NULL|  Democrat|NULL|                NULL|                NULL|        NULL|NULL|        NULL|                NULL|      NULL|   

In [11]:
cohort_retained_df = (
    joined.groupBy("period")
    .agg(F.countDistinct("a.id_bioguide").alias("cohort_retained"))
    .orderBy("period")
)

cohort_retained_df.show(3)

+------+---------------+
|period|cohort_retained|
+------+---------------+
|     0|          12518|
|     1|           3600|
|     2|           3619|
+------+---------------+
only showing top 3 rows



In [12]:
spark.sql("""
    SELECT 
        floor(months_between(b.term_start, a.first_term) / 12) as period,
        count(distinct a.id_bioguide) as cohort_retained
    FROM
    (
        SELECT 
            id_bioguide, 
            min(term_start) as first_term
        FROM legislators_terms
        GROUP BY 1
    ) a
    JOIN legislators_terms b ON a.id_bioguide = b.id_bioguide
    GROUP BY 1
    ORDER BY 1;
""").show(3)


+------+---------------+
|period|cohort_retained|
+------+---------------+
|     0|          12518|
|     1|           3600|
|     2|           3619|
+------+---------------+
only showing top 3 rows



### calculate the total cohort_size and populate it in each row so that the cohort_retained can be divided by it.

In [13]:
Window_f  = Window.orderBy('period')

cohort_size_df = cohort_retained_df.withColumn('cohort_size', F.first_value('cohort_retained').over(Window_f))
cohort_size_df.show(3)

+------+---------------+-----------+
|period|cohort_retained|cohort_size|
+------+---------------+-----------+
|     0|          12518|      12518|
|     1|           3600|      12518|
|     2|           3619|      12518|
+------+---------------+-----------+
only showing top 3 rows



In [14]:
cohort_pct_df = cohort_size_df.withColumn('pct_retained',F.try_divide("cohort_retained","cohort_size"))
cohort_pct_df.show(3)

+------+---------------+-----------+-------------------+
|period|cohort_retained|cohort_size|       pct_retained|
+------+---------------+-----------+-------------------+
|     0|          12518|      12518|                1.0|
|     1|           3600|      12518| 0.2875858763380732|
|     2|           3619|      12518|0.28910369068541303|
+------+---------------+-----------+-------------------+
only showing top 3 rows



In [15]:
spark.sql("""
    SELECT
          period,
          first_value(cohort_retained) over(order by period) as cohort_size,
          cohort_retained,
          cohort_retained / (first_value(cohort_retained) over(order by period)) as pct_retained
    FROM
    (SELECT 
        floor(months_between(b.term_start, a.first_term) / 12) as period,
        count(distinct a.id_bioguide) as cohort_retained
    FROM
        (SELECT 
            id_bioguide,
            min(term_start) as first_term
        FROM legislators_terms
        GROUP BY 1) a
    JOIN legislators_terms b 
    ON a.id_bioguide = b.id_bioguide
    GROUP BY 1) aa
""").show(3)

+------+-----------+---------------+-------------------+
|period|cohort_size|cohort_retained|       pct_retained|
+------+-----------+---------------+-------------------+
|     0|      12518|          12518|                1.0|
|     1|      12518|           3600| 0.2875858763380732|
|     2|      12518|           3619|0.28910369068541303|
+------+-----------+---------------+-------------------+
only showing top 3 rows



### We can take the cohort retention result and reshape the data to show it in table format. Pivot and flatten the results

In [16]:
pivot_df = cohort_pct_df.groupBy("cohort_size").agg(
    F.max(F.when(F.col("period") == 0, F.col("pct_retained"))).alias("yr0"),
    F.max(F.when(F.col("period") == 1, F.col("pct_retained"))).alias("yr1"),
    F.max(F.when(F.col("period") == 2, F.col("pct_retained"))).alias("yr2"),
    F.max(F.when(F.col("period") == 3, F.col("pct_retained"))).alias("yr3"),
    F.max(F.when(F.col("period") == 4, F.col("pct_retained"))).alias("yr4")
).show()

+-----------+---+------------------+-------------------+----------------+------------------+
|cohort_size|yr0|               yr1|                yr2|             yr3|               yr4|
+-----------+---+------------------+-------------------+----------------+------------------+
|      12518|1.0|0.2875858763380732|0.28910369068541303|0.14626937210417|0.2564307397347819|
+-----------+---+------------------+-------------------+----------------+------------------+



In [17]:
spark.sql("""
    SELECT 
          cohort_size,
          max(case when period = 0 then pct_retained end) as yr0,
          max(case when period = 1 then pct_retained end) as yr1,
          max(case when period = 2 then pct_retained end) as yr2,
          max(case when period = 3 then pct_retained end) as yr3,
          max(case when period = 4 then pct_retained end) as yr4
    FROM
          (
            SELECT 
                  period,
                  first_value(cohort_retained) over (order by period) as cohort_size,
                  cohort_retained / first_value(cohort_retained) over (order by period) as pct_retained
            FROM
                  (
                  SELECT
                        floor(months_between(b.term_start, a.first_term) / 12) as period,
                        count(*) as cohort_retained
                  FROM
                        (
                        SELECT 
                              id_bioguide, min(term_start) as first_term
                              FROM legislators_terms
                        GROUP BY 1
                        ) a
            JOIN legislators_terms b on a.id_bioguide = b.id_bioguide
            GROUP BY 1
            ) aa
      ) aaa
      GROUP BY 1;
""").show()

+-----------+---+------------------+------------------+-------------------+-------------------+
|cohort_size|yr0|               yr1|               yr2|                yr3|                yr4|
+-----------+---+------------------+------------------+-------------------+-------------------+
|      12647|1.0|0.2847315568909623|0.2865501700007907|0.14501462797501383|0.25397327429429906|
+-----------+---+------------------+------------------+-------------------+-------------------+



### Adjusting Time Series to Increase Retention Accuracy

In [18]:
date_dim_df = spark.read.jdbc(url=jdbc_url, table="date_dim", properties=properties)
date_dim_df.show(3)

+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|      date|date_key|day_of_month|day_of_year|day_of_week| day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of_quarter|year|decade|century|
+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|1770-01-01|17700101|           1|          1|          1|   Monday|           Mon|          1|            1|1770-01-01|           1|   

In [19]:
date_dim_df.createOrReplaceTempView('date_dim')

In [20]:
first_term_df = df_legislators_terms.groupBy('id_bioguide').agg(F.min('term_start').alias('first_term'))
first_term_df.show(2)

+-----------+----------+
|id_bioguide|first_term|
+-----------+----------+
|    M001163|2005-03-10|
|    P000604|2012-11-15|
+-----------+----------+
only showing top 2 rows



In [21]:
joined_first_term_df = first_term_df.join(
    df_legislators_terms.alias("b"),
    on="id_bioguide",
    how="inner"
)
joined_first_term_df.show(2)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|
+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+
|    M001163|2005-03-10|          0|M001163-0|      rep|2005-03-10|2007-01-03|   CA|       5| NULL|Democrat|NULL|NULL|                NULL|        NULL|NULL|        NULL|                NULL|      NULL|   NULL|  NULL|
|    P000604|2012-11-15|          0|P000604-0|      rep|2012-11-15|2013-01-03|   NJ|      10| NULL|Democrat|NULL|NULL|2310 Raybu

In [22]:
first_term_and_date_dim_df = joined_first_term_df.join(
    date_dim_df.alias("c"),
    on=[
        (F.col("c.date").between(F.col("b.term_start"), F.col("b.term_end"))),
        (F.col("c.month_name") == "December"),
        (F.col("c.day_of_month") == 31)
    ],
    how="left"
)
first_term_and_date_dim_df.show(2)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+-------+-----+----+------------+------+----------+-------+------+----------+--------+------------+-----------+-----------+--------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|address|phone| fax|contact_form|office|state_rank|rss_url|caucus|      date|date_key|day_of_month|day_of_year|day_of_week|day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of_quarter|year|decade|century|
+-----------+----------+-----------+---------+---------+------

In [23]:
cohort_retained_df = first_term_and_date_dim_df.withColumn(
    "period",
    F.coalesce(
        (F.months_between(F.col("date"), F.col("first_term")) / 12).cast("int"),
        F.lit(0)
    )
).groupBy("period").agg(F.countDistinct("id_bioguide").alias("cohort_retained"))
cohort_retained_df.orderBy('period').show(3)

+------+---------------+
|period|cohort_retained|
+------+---------------+
|     0|          12518|
|     1|          12328|
|     2|           8166|
+------+---------------+
only showing top 3 rows



In [24]:
Window_f = Window.orderBy('period')
cohort_size_df = cohort_retained_df.withColumn('cohort_size',F.first_value('cohort_retained').over(Window_f).alias('cohort_size'))
cohort_pct_df = cohort_size_df.withColumn('pct_retained', F.col('cohort_retained')/F.col('cohort_size'))
cohort_pct_df.show(3)

+------+---------------+-----------+------------------+
|period|cohort_retained|cohort_size|      pct_retained|
+------+---------------+-----------+------------------+
|     0|          12518|      12518|               1.0|
|     1|          12328|      12518|0.9848218565266017|
|     2|           8166|      12518|0.6523406294935293|
+------+---------------+-----------+------------------+
only showing top 3 rows



In [25]:
spark.sql("""
SELECT 
          period,
          first_value(cohort_retained) over (order by period) as cohort_size,
          cohort_retained,
          cohort_retained * 1.0 / first_value(cohort_retained) over (order by period) as pct_retained
FROM
        (
        SELECT 
          coalesce(floor(months_between(c.date, a.first_term) / 12), 0) as period,
          count(distinct a.id_bioguide) as cohort_retained
        FROM
                (
                SELECT 
                        id_bioguide, 
                        min(term_start) as first_term
                FROM legislators_terms
                GROUP BY 1
                ) a
        JOIN legislators_terms b on a.id_bioguide = b.id_bioguide
        LEFT JOIN date_dim c on c.date between b.term_start and b.term_end
        and c.month_name = 'December' and c.day_of_month = 31
        GROUP BY 1
        ) aa
;

        """).show(3)

+------+-----------+---------------+------------------+
|period|cohort_size|cohort_retained|      pct_retained|
+------+-----------+---------------+------------------+
|     0|      12518|          12518|1.0000000000000000|
|     1|      12518|          12328|0.9848218565266017|
|     2|      12518|           8166|0.6523406294935293|
+------+-----------+---------------+------------------+
only showing top 3 rows



### If the data set does not contain an end date, there are a couple of options for imputing one. One option is to add a fixed interval to the start date, when the length of a subscription or term is known.

In [26]:
joined_first_term_df.show(3)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|     party| how|                 url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|
+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+
|    M001163|2005-03-10|          0|M001163-0|      rep|2005-03-10|2007-01-03|   CA|       5| NULL|  Democrat|NULL|                NULL|                NULL|        NULL|NULL|        NULL|                NULL|      NULL|   NULL|  NULL|
|    P000604|2012-11-15|          0|P000604-0|      rep|

In [27]:
term_types_df = joined_first_term_df.withColumn(
    'term_end',F.when(F.col('term_type')=="rep", 
                      F.add_months(F.col('term_start'), 2*12))
                .when(F.col('term_type')=="sen", 
                      F.add_months(F.col('term_start'), 6*12))
                      )
term_types_df.select('id_bioguide','first_term','term_start','term_end').orderBy('id_bioguide').show(3)

+-----------+----------+----------+----------+
|id_bioguide|first_term|term_start|  term_end|
+-----------+----------+----------+----------+
|    A000001|1951-01-03|1951-01-03|1953-01-03|
|    A000002|1947-01-03|1947-01-03|1949-01-03|
|    A000002|1947-01-03|1953-01-03|1955-01-03|
+-----------+----------+----------+----------+
only showing top 3 rows



In [28]:
spark.sql("""
    SELECT 
          a.id_bioguide, 
          a.first_term,
          b.term_start,
          case  when b.term_type = 'rep' then b.term_start + interval '2 years'
                when b.term_type = 'sen' then b.term_start + interval '6 years'
          end as term_end
    FROM
        (
        SELECT 
            id_bioguide, 
            min(term_start) as first_term
        FROM legislators_terms
        GROUP BY 1
        ) a
    JOIN legislators_terms b on a.id_bioguide = b.id_bioguide;
""").show(3)

+-----------+----------+----------+----------+
|id_bioguide|first_term|term_start|  term_end|
+-----------+----------+----------+----------+
|    M001163|2005-03-10|2005-03-10|2007-03-10|
|    P000604|2012-11-15|2012-11-15|2014-11-15|
|    C001115|2018-07-10|2018-07-10|2020-07-10|
+-----------+----------+----------+----------+
only showing top 3 rows



This block of code can then be plugged into the retention code to derive the period
and pct_retained. The drawback to this method is that it fails to capture instances in
which a legislator did not complete a full term, which can happen in the event of
death or appointment to a higher office.

A second option is to use the subsequent starting date, minus one day, as the
term_end date. 

In [29]:
window_spec = Window.partitionBy("id_bioguide").orderBy("term_start")

df_term_end = joined_first_term_df.withColumn('term_end',F.date_sub(F.lead("term_start", 1).over(window_spec), 1))
df_term_end.select('id_bioguide','first_term','term_start','term_end').show(3)

+-----------+----------+----------+----------+
|id_bioguide|first_term|term_start|  term_end|
+-----------+----------+----------+----------+
|    A000001|1951-01-03|1951-01-03|      NULL|
|    A000002|1947-01-03|1947-01-03|1949-01-02|
|    A000002|1947-01-03|1949-01-03|1951-01-02|
+-----------+----------+----------+----------+
only showing top 3 rows



This code block can then be plugged into the retention code. This method has a couple of drawbacks. First, when there is no subsequent term, the lead function returns
null, leaving that term without a term_end. A default value, such as a default interval
shown in the last example, could be used in such cases. The second drawback is that
this method assumes that terms are always consecutive, with no time spent out of
office. Although most legislators tend to serve continuously until their congressional
careers end, there are certainly examples of gaps between terms spanning several
years.

Any time we make adjustments to fill in missing data, we need to be careful about the
assumptions we make. In subscription- or term-based contexts, explicit start and end
dates tend to be most accurate. Either of the two other methods shown—adding a
fixed interval or setting the end date relative to the next start date—can be used when
no end date is present and we have a reasonable expectation that most customers or
users will stay for the duration assumed.

Now that we’ve seen how to calculate a basic retention curve and correct for missing
dates, we can start adding in cohort groups. Comparing retention between different
groups is one of the main reasons to do cohort analysis. Next, I’ll discuss forming
groups from the time series itself, and after that, I’ll discuss forming cohort groups
from data in other tables.

### Cohorts Derived from the Time Series Itself

time-based cohorts based on the first date,

In [30]:
first_term_year_df = df_legislators_terms.groupBy('id_bioguide').agg(F.min('term_start').alias('first_term'))
first_term_year_df.show(3)

+-----------+----------+
|id_bioguide|first_term|
+-----------+----------+
|    M001163|2005-03-10|
|    P000604|2012-11-15|
|    C001115|2018-07-10|
+-----------+----------+
only showing top 3 rows



In [31]:
joined_first_term_year_df = first_term_year_df.join(df_legislators_terms.alias('b'), on='id_bioguide', how='inner')
joined_first_term_year_df = joined_first_term_year_df.join(date_dim_df.alias('c'), 
                                                           on= [(F.col("c.date").between(F.col("b.term_start"),F.col("b.term_end"))),
                                                                (F.col("c.month_name")=="December"),
                                                                (F.col("c.day_of_month")==31)
                                                                ], 
                                                                how="left")
joined_first_term_year_df.show(3)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+----------+--------+------------+-----------+-----------+--------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|      date|date_key|day_of_month|day_of_year|day_of_week|day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of_quarter|year|decade|cen

In [32]:
cohort_retained_year_df = joined_first_term_year_df.withColumn(
    'first_year',
    F.year(F.col('first_term'))
).withColumn(
    'period',
    F.floor(F.months_between(F.col('date'), F.col('first_term')) / 12)
).withColumn(
    'period',
    F.coalesce(F.col('period'), F.lit(0))
).groupBy('first_year', 'period').agg(
    F.countDistinct("id_bioguide").alias("cohort_retained")
).sort('first_year', 'period')
cohort_retained_year_df.show(3)


+----------+------+---------------+
|first_year|period|cohort_retained|
+----------+------+---------------+
|      1789|     0|             89|
|      1789|     1|             89|
|      1789|     2|             57|
+----------+------+---------------+
only showing top 3 rows



In [33]:
window_spec = Window.partitionBy('first_year').orderBy('period')
cohort_size_year_df = cohort_retained_year_df.withColumn('cohort_size',
    F.first_value(F.col('cohort_retained')).over(window_spec).alias('cohort_size'))

cohort_pct_year_df = cohort_size_year_df.withColumn('pct_retained', 
    F.col('cohort_retained')/F.col('cohort_size'))

cohort_pct_year_df.orderBy('first_year').show(3)

+----------+------+---------------+-----------+------------------+
|first_year|period|cohort_retained|cohort_size|      pct_retained|
+----------+------+---------------+-----------+------------------+
|      1789|     0|             89|         89|               1.0|
|      1789|     1|             89|         89|               1.0|
|      1789|     2|             57|         89|0.6404494382022472|
+----------+------+---------------+-----------+------------------+
only showing top 3 rows



In [34]:
spark.sql("""
SELECT 
          first_year,
          period,
          first_value(cohort_retained) over (order by period) as cohort_size,
          cohort_retained,
          cohort_retained * 1.0 / first_value(cohort_retained) over (order by period) as pct_retained
FROM
        (
        SELECT 
          date_part('year',a.first_term) as first_year,
          coalesce(floor(months_between(c.date, a.first_term) / 12), 0) as period,
          count(distinct a.id_bioguide) as cohort_retained
        FROM
                (
                SELECT 
                        id_bioguide, 
                        min(term_start) as first_term
                FROM legislators_terms
                GROUP BY 1
                ) a
        JOIN legislators_terms b on a.id_bioguide = b.id_bioguide
        LEFT JOIN date_dim c on c.date between b.term_start and b.term_end
        and c.month_name = 'December' and c.day_of_month = 31
        GROUP BY 1,2
        ORDER BY 1
        ) aa
;

        """).show(3)

+----------+------+-----------+---------------+------------------+
|first_year|period|cohort_size|cohort_retained|      pct_retained|
+----------+------+-----------+---------------+------------------+
|      1789|     0|         89|             89|1.0000000000000000|
|      1790|     0|         89|              6|0.0674157303370787|
|      1791|     0|         89|             37|0.4157303370786517|
+----------+------+-----------+---------------+------------------+
only showing top 3 rows



### Cohort Retention by First State

In [35]:
window_spec_state = Window.partitionBy('id_bioguide').orderBy('term_start')
window_spec_term = Window.partitionBy('id_bioguide')
first_state_df = df_legislators_terms.withColumn('first_term',F.min('term_start').over(window_spec_term)
                                                 ).withColumn(
                                                     'first_state', F.first_value('state').over(window_spec_state)
                                                 ).select("id_bioguide", "term_start", "term_end", "state", "first_term", "first_state")

first_state_df.show(3)

+-----------+----------+----------+-----+----------+-----------+
|id_bioguide|term_start|  term_end|state|first_term|first_state|
+-----------+----------+----------+-----+----------+-----------+
|    A000001|1951-01-03|1953-01-03|   ND|1951-01-03|         ND|
|    A000002|1947-01-03|1949-01-03|   VA|1947-01-03|         VA|
|    A000002|1949-01-03|1951-01-03|   VA|1947-01-03|         VA|
+-----------+----------+----------+-----+----------+-----------+
only showing top 3 rows



In [36]:
joined_state_df = first_state_df.alias("b").join(date_dim_df.alias("c"),
                                                 on = [
                                                (F.col("c.date").between(F.col("b.term_start"), F.col("b.term_end"))),
                                                (F.col("c.month_name") == "December"),
                                                (F.col("c.day_of_month") == 31)
                                                    ],
                                                how="left")
joined_state_df.show(3)

+-----------+----------+----------+-----+----------+-----------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|id_bioguide|term_start|  term_end|state|first_term|first_state|      date|date_key|day_of_month|day_of_year|day_of_week| day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of_quarter|year|decade|century|
+-----------+----------+----------+-----+----------+-----------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+---------

In [37]:
period_first_state = joined_state_df.withColumn('period',
                                             F.coalesce(F.year(F.col("c.date")) - F.year(F.col("b.first_term")),
                                                        F.lit(0))
                                                )
period_first_state.show(3)

+-----------+----------+----------+-----+----------+-----------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+------+
|id_bioguide|term_start|  term_end|state|first_term|first_state|      date|date_key|day_of_month|day_of_year|day_of_week| day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of_quarter|year|decade|century|period|
+-----------+----------+----------+-----+----------+-----------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+--------

In [38]:
cohort_retained_state_df = period_first_state.groupBy('first_state','period').agg(F.count_distinct('id_bioguide').alias('cohort_retained'))
cohort_retained_state_df.show(3)

+-----------+------+---------------+
|first_state|period|cohort_retained|
+-----------+------+---------------+
|         PA|    12|            139|
|         IN|     0|            353|
|         WA|    27|              9|
+-----------+------+---------------+
only showing top 3 rows



In [39]:
window_spec_state = Window.partitionBy('first_state').orderBy('period')
cohort_size_state_df = cohort_retained_state_df.withColumn('cohort_size',
                                                           F.first_value('cohort_retained').over(window_spec_state))
cohort_size_state_df.show(3)

+-----------+------+---------------+-----------+
|first_state|period|cohort_retained|cohort_size|
+-----------+------+---------------+-----------+
|         AK|     0|             19|         19|
|         AK|     1|             19|         19|
|         AK|     2|             15|         19|
+-----------+------+---------------+-----------+
only showing top 3 rows



In [40]:
cohort_pct_state_df = cohort_size_state_df.withColumn('pct_retained',
                                                      F.try_divide(F.col('cohort_retained'),F.col('cohort_size'))
                                                      )
cohort_pct_state_df.show(3)

+-----------+------+---------------+-----------+------------------+
|first_state|period|cohort_retained|cohort_size|      pct_retained|
+-----------+------+---------------+-----------+------------------+
|         AK|     0|             19|         19|               1.0|
|         AK|     1|             19|         19|               1.0|
|         AK|     2|             15|         19|0.7894736842105263|
+-----------+------+---------------+-----------+------------------+
only showing top 3 rows



In [41]:
spark.sql("""

        SELECT 
          first_state, 
          period,first_value(cohort_retained) over (partition by first_state order by period) as cohort_size,
          cohort_retained,
          cohort_retained / first_value(cohort_retained) over (partition by first_state order by period) as pct_retained
        FROM
            (
            SELECT 
                a.first_state,
                coalesce(floor(months_between(c.date, a.first_term) / 12), 0) as period,
                count(distinct a.id_bioguide) as cohort_retained
            FROM
            (
                SELECT 
                    distinct id_bioguide,
                    min(term_start) over (partition by id_bioguide) as first_term,
                    first_value(state) over (partition by id_bioguide order by term_start) as first_state
                FROM legislators_terms
                ) a
        JOIN legislators_terms b on a.id_bioguide = b.id_bioguide
        LEFT JOIN date_dim c on c.date between b.term_start and b.term_end
        and c.month_name = 'December' and c.day_of_month = 31
        GROUP BY 1,2
            ) aa
            ;

  """).show(3)

+-----------+------+-----------+---------------+------------------+
|first_state|period|cohort_size|cohort_retained|      pct_retained|
+-----------+------+-----------+---------------+------------------+
|         AK|     0|         19|             19|               1.0|
|         AK|     1|         19|             19|               1.0|
|         AK|     2|         19|             15|0.7894736842105263|
+-----------+------+-----------+---------------+------------------+
only showing top 3 rows



### Defining the Cohort from a Separate Table

In [42]:
gender_df = first_term_and_date_dim_df.join(df_legislators, on='id_bioguide',how='inner')
gender_df.show(3)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+----------+--------+------------+-----------+-----------+--------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+--------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+----------------------+-----------+--------+----------------+-----------+-------------------+----------------+--------------------------+---------+--------+------------+------+--------------+--------------+---------+--------+--------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|     

In [43]:
cohort_retained_gender_df = gender_df.withColumn(
    "period",
    F.coalesce(
        (F.months_between(F.col("date"), F.col("first_term")) / 12).cast("int"),
        F.lit(0)
    )
).groupBy("gender","period").agg(F.countDistinct("id_bioguide").alias("cohort_retained"))
cohort_retained_gender_df.orderBy('period').show(3)

+------+------+---------------+
|gender|period|cohort_retained|
+------+------+---------------+
|     F|     0|            366|
|     M|     0|          12152|
|     M|     1|          11979|
+------+------+---------------+
only showing top 3 rows



In [44]:
window_spec_gender = Window.partitionBy('gender').orderBy('period')
cohort_pct_gender_df = cohort_retained_gender_df.withColumn(
   'cohort_size',F.first_value('cohort_retained').over(window_spec_gender)
).withColumn(
    'pct_change', F.try_divide('cohort_retained','cohort_size')
).orderBy('period').show(4)

+------+------+---------------+-----------+------------------+
|gender|period|cohort_retained|cohort_size|        pct_change|
+------+------+---------------+-----------+------------------+
|     F|     0|            366|        366|               1.0|
|     M|     0|          12152|      12152|               1.0|
|     F|     1|            349|        366| 0.953551912568306|
|     M|     1|          11979|      12152|0.9857636603028308|
+------+------+---------------+-----------+------------------+
only showing top 4 rows



### The first female legislator did not take office until 1917, so let's use that year for MALE and FEMALE comparision.

In [45]:
gender_df_1917 = gender_df.filter(F.col('first_term').between('1917-01-01','1999-12-31'))
gender_df_1917.show(3)

+-----------+----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+----+-------+-----+----+------------+------+----------+-------+------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+--------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+----------------------+-----------+--------+-----------------+-----------+-------------------+----------------+--------------------------+---------+--------+------------+------+--------------+--------------+--------+--------+--------+
|id_bioguide|first_term|term_number|  term_id|term_type|term_start|  term_end|state|district|class|     party| how| url|address|phone| fax|contact_form|of

In [46]:
cohort_retained_gender_df_1917 = gender_df_1917.withColumn(
    "period",
    F.coalesce(
        (F.months_between(F.col("date"), F.col("first_term")) / 12).cast("int"),
        F.lit(0)
    )
).groupBy("gender","period").agg(F.countDistinct("id_bioguide").alias("cohort_retained"))
cohort_retained_gender_df_1917.orderBy('period').show(3)

+------+------+---------------+
|gender|period|cohort_retained|
+------+------+---------------+
|     F|     0|            200|
|     M|     0|           3833|
|     M|     1|           3769|
+------+------+---------------+
only showing top 3 rows



In [47]:
window_spec_gender = Window.partitionBy('gender').orderBy('period')
cohort_pct_gender_df_1917 = cohort_retained_gender_df_1917.withColumn(
   'cohort_size',F.first_value('cohort_retained').over(window_spec_gender)
).withColumn(
    'pct_change', F.try_divide('cohort_retained','cohort_size')
).orderBy('period').show(4)

+------+------+---------------+-----------+------------------+
|gender|period|cohort_retained|cohort_size|        pct_change|
+------+------+---------------+-----------+------------------+
|     F|     0|            200|        200|               1.0|
|     M|     0|           3833|       3833|               1.0|
|     F|     1|            187|        200|             0.935|
|     M|     1|           3769|       3833|0.9833028959039917|
+------+------+---------------+-----------+------------------+
only showing top 4 rows



In [48]:
spark.sql("""
SELECT 
    gender,
    period,
    first_value(cohort_retained) OVER (
        PARTITION BY gender ORDER BY period
    ) AS cohort_size,
    cohort_retained,
    cohort_retained * 1.0 / first_value(cohort_retained) OVER (
        PARTITION BY gender ORDER BY period
    ) AS pct_retained
FROM (
    SELECT 
        d.gender,
        coalesce(floor(months_between(c.date, a.first_term) / 12), 0) AS period,
        count(DISTINCT a.id_bioguide) AS cohort_retained
    FROM (
        SELECT 
            id_bioguide, 
            min(term_start) AS first_term
        FROM legislators_terms
        GROUP BY id_bioguide
    ) a
    JOIN legislators_terms b 
        ON a.id_bioguide = b.id_bioguide
    LEFT JOIN date_dim c 
        ON c.date BETWEEN b.term_start AND b.term_end
       AND c.month_name = 'December' 
       AND c.day_of_month = 31
    JOIN legislators d 
        ON a.id_bioguide = d.id_bioguide
    WHERE a.first_term BETWEEN '1917-01-01' AND '1999-12-31'
    GROUP BY d.gender, coalesce(floor(months_between(c.date, a.first_term) / 12), 0)
) aa
""").show(3)

+------+------+-----------+---------------+------------------+
|gender|period|cohort_size|cohort_retained|      pct_retained|
+------+------+-----------+---------------+------------------+
|     F|     0|        200|            200|1.0000000000000000|
|     F|     1|        200|            187|0.9350000000000000|
|     F|     2|        200|            149|0.7450000000000000|
+------+------+-----------+---------------+------------------+
only showing top 3 rows



### Dealing with Sparse Cohorts


let’s attempt to cohort female legislators by the first state they
represented to see if there are any differences in retention.

In [49]:
window_spec_first_term = Window.partitionBy('id_bioguide')
window_spec_first_state = Window.partitionBy('id_bioguide').orderBy('term_start')
gender_first_state_df = df_legislators_terms.withColumn('first_term',
                                                              F.min(F.col('term_start')).over(window_spec_first_term)
                                                        ).withColumn('first_state',
                                                                     F.first_value(F.col('state')).over(window_spec_first_state)
                                                                     )
gender_first_state_df = (
    gender_first_state_df
    .select("id_bioguide", "first_term", "first_state")
    .distinct()
)
gender_first_state_df.show(3)

+-----------+----------+-----------+
|id_bioguide|first_term|first_state|
+-----------+----------+-----------+
|    A000001|1951-01-03|         ND|
|    A000002|1947-01-03|         VA|
|    A000003|1817-12-01|         GA|
+-----------+----------+-----------+
only showing top 3 rows



In [50]:
gender_first_state_df = (
    gender_first_state_df.alias("a")
    .join(
        df_legislators_terms.alias("b"),
        on="id_bioguide",
        how="inner"
    )
)
gender_first_state_df.show(3)

+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+
|id_bioguide|first_term|first_state|term_number|  term_id|term_type|term_start|  term_end|state|district|class|     party| how|                 url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|
+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+--------------------+--------------------+------------+----+------------+--------------------+----------+-------+------+
|    M001163|2005-03-10|         CA|          0|M001163-0|      rep|2005-03-10|2007-01-03|   CA|       5| NULL|  Democrat|NULL|                NULL|                NULL|        NULL|NULL|        NULL|                NULL|      NULL|   NULL|  NULL|
|    P00

In [51]:
gender_first_state_date_df = (
    gender_first_state_df.alias("b")
    .join(
        date_dim_df.alias("c"),
        on=[
        (F.col("c.date").between(F.col("b.term_start"), F.col("b.term_end"))),
        (F.col("c.month_name") == "December"),
        (F.col("c.day_of_month") == 31)
    ],
    how="left")
    )
gender_first_state_date_df.show(3)

+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+----------+--------+------------+-----------+-----------+--------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+
|id_bioguide|first_term|first_state|term_number|  term_id|term_type|term_start|  term_end|state|district|class|   party| how| url|             address|       phone| fax|contact_form|              office|state_rank|rss_url|caucus|      date|date_key|day_of_month|day_of_year|day_of_week|day_name|day_short_name|week_number|week_of_month|      week|month_number|month_name|month_short_name|first_day_of_month|last_day_of_month|quarter_number|quarter_name|first_day_of_quarter|last_day_of

In [52]:
gender_first_state_date_df = (
    gender_first_state_date_df.alias("c")
    .join(
        df_legislators.alias("d"),
        on = "id_bioguide",
        how = "inner"
    )
)
gender_first_state_date_df.show(3)

+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+--------+----+----+--------------------+------------+----+------------+--------------------+----------+-------+------+----------+--------+------------+-----------+-----------+--------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+--------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+----------------------+-----------+--------+----------------+-----------+-------------------+----------------+--------------------------+---------+--------+------------+------+--------------+--------------+---------+--------+--------+
|id_bioguide|first_term|first_state|term_number|  term_id|term_type|term_start|  term_end|state|district|class|

In [53]:
gender_first_state_date_df = gender_first_state_date_df.filter(F.col("first_term").between('1917-01-01','1999-12-31'))
gender_first_state_date_df.show(3)

+-----------+----------+-----------+-----------+---------+---------+----------+----------+-----+--------+-----+----------+----+----+-------+-----+----+------------+------+----------+-------+------+----------+--------+------------+-----------+-----------+---------+--------------+-----------+-------------+----------+------------+----------+----------------+------------------+-----------------+--------------+------------+--------------------+-------------------+----+------+-------+--------------------+----------+---------+-----------+--------+------+---------------+------------------+----------------+----------+------+----------------------+-----------+--------+-----------------+-----------+-------------------+----------------+--------------------------+---------+--------+------------+------+--------------+--------------+--------+--------+--------+
|id_bioguide|first_term|first_state|term_number|  term_id|term_type|term_start|  term_end|state|district|class|     party| how| url|address|ph

In [54]:
cohort_retained_state_gender_df = gender_first_state_date_df.withColumn(
    'period', F.coalesce(F.year(F.col("date")) - F.year(F.col("first_term")), F.lit(0))
)
cohort_retained_state_gender_df = cohort_retained_state_gender_df.groupBy(
    'first_state','gender','period'
).agg(F.count_distinct('id_bioguide').alias('cohort_retained'))
cohort_retained_state_gender_df.show(3)

+-----------+------+------+---------------+
|first_state|gender|period|cohort_retained|
+-----------+------+------+---------------+
|         WA|     F|     8|              5|
|         NY|     M|    12|            111|
|         MO|     M|     8|             48|
+-----------+------+------+---------------+
only showing top 3 rows



In [55]:
window_state_gender_cohort = Window.partitionBy('first_state','gender').orderBy('period')
cohort_pct_state_gender_df = cohort_retained_state_gender_df.withColumn(
    "cohort_size", F.first_value(F.col("cohort_retained")).over(window_state_gender_cohort)
).withColumn(
    "pct_change", F.try_divide(F.col("cohort_retained"),F.col("cohort_size"))
)
cohort_pct_state_gender_df.show(3)

+-----------+------+------+---------------+-----------+------------------+
|first_state|gender|period|cohort_retained|cohort_size|        pct_change|
+-----------+------+------+---------------+-----------+------------------+
|         AK|     M|     0|             13|         13|               1.0|
|         AK|     M|     1|             13|         13|               1.0|
|         AK|     M|     2|             11|         13|0.8461538461538461|
+-----------+------+------+---------------+-----------+------------------+
only showing top 3 rows



In [56]:
spark.sql("""
    SELECT first_state, gender, period
,first_value(cohort_retained) over (partition by first_state, gender
 order by period) as cohort_size
,cohort_retained
,cohort_retained /
 first_value(cohort_retained) over (partition by first_state, gender
 order by period) as pct_retained
FROM
(
 SELECT a.first_state, d.gender
 ,coalesce(floor(months_between(c.date, a.first_term) / 12), 0) AS period
 ,count(distinct a.id_bioguide) as cohort_retained
 FROM
 (
 SELECT distinct id_bioguide
 ,min(term_start) over (partition by id_bioguide) as first_term
 ,first_value(state) over (partition by id_bioguide
 order by term_start) as first_state
 FROM legislators_terms
 ) a
 JOIN legislators_terms b on a.id_bioguide = b.id_bioguide
 LEFT JOIN date_dim c on c.date between b.term_start and b.term_end
 and c.month_name = 'December' and c.day_of_month = 31
 JOIN legislators d on a.id_bioguide = d.id_bioguide
 WHERE a.first_term between '1917-01-01' and '1999-12-31'
 GROUP BY 1,2,3
) aa
;
""").show(5)

+-----------+------+------+-----------+---------------+------------------+
|first_state|gender|period|cohort_size|cohort_retained|      pct_retained|
+-----------+------+------+-----------+---------------+------------------+
|         AK|     M|     0|         13|             13|               1.0|
|         AK|     M|     1|         13|             13|               1.0|
|         AK|     M|     2|         13|             11|0.8461538461538461|
|         AK|     M|     3|         13|             11|0.8461538461538461|
|         AK|     M|     4|         13|              9|0.6923076923076923|
+-----------+------+------+-----------+---------------+------------------+
only showing top 5 rows



### Handling NULLS in retention
 let’s look at how to ensure a record for every period so that the query returns
zero values for retention instead of nulls

In [70]:
# generate periods 0 to 20
generate_period_df = spark.range(0, 21)

In [71]:
cohort_retained_null_df = generate_period_df.crossJoin(cohort_retained_state_gender_df)
cohort_retained_null_df.show(3)

+---+-----------+------+------+---------------+
| id|first_state|gender|period|cohort_retained|
+---+-----------+------+------+---------------+
|  0|         WA|     F|     8|              5|
|  1|         WA|     F|     8|              5|
|  2|         WA|     F|     8|              5|
+---+-----------+------+------+---------------+
only showing top 3 rows



In [69]:
handling_null_retention_df = (
    cohort_retained_null_df.alias("a")
    .join(
        cohort_retained_state_gender_df.alias("b"),
        (F.col("a.gender") == F.col("b.gender")) &
        (F.col("a.first_state") == F.col("b.first_state")) &
        (F.col("a.period") == F.col("b.period")),
        how="left"
    ).withColumn("cohort_retained", F.coalesce(F.col("cohort_retained"), F.lit(0)))
    .withColumn("pct_retained", F.col("cohort_retained") / F.col("cohort_size"))
)
handling_null_retention_df.show()

AnalysisException: [AMBIGUOUS_REFERENCE] Reference `a`.`period` is ambiguous, could be: [`a`.`period`, `a`.`period`].