## Single Producer, Multiple Consumer MultiThreading RunQueue/Task Model
This is a single Producer thread, multiple Consumer thread Queueing model where a single Queue is worked on by two  Consumer threads.    

The Queue is a FIFO queue where the first tasks added are the first retrieved.

In [1]:
# List of modules to import. 
import threading
import queue
import pandas as pd
import sqlite3

# List of filenames to work on in tasks added to a Queue. 
NUMBER_OF_WORKERTHREADS = 2
FILENAME_1 = 'C:/Kal/Stat-Work/Stat-Code/Python/student_files/resources/baseball/Batting.csv'
FILENAME_2 = 'C:/Kal/Stat-Work/Stat-Code/Python/student_files/resources/boston_marathon_2017.csv'
FILENAME_3 = 'C:/Kal/Stat-Work/Stat-Code/Python/student_files/ch06_database/batting.db'
task_list = [FILENAME_1, FILENAME_2, FILENAME_3]

# Module function to do work on a task as identified by a filename.
def do_work(filename, i):
    if (filename == FILENAME_1):
        
        # Read the DataFrame from the CSV file.
        df = pd.read_csv(filename)
            
        # Fill in the missing values, replacing NA with 0.
        df.fillna(value=0, axis=None, inplace=True)
            
        # Add a new column for the batting average for each row.
        df['AVGS'] = df['AB']/df['H']
        # Fill in the missing values, replacing NA with 0.
        df.fillna(value=0, axis=None, inplace=True)
        #print('DataFrame : {0}'.format(df))
            
        # Compute the summary statistic for 50th percentile. 
        df2 = df.loc[(df['AB'] >= 502)]
        fifty = df2.describe()['AVGS']['50%']
        print('Thread {0}: i: Batting: 50% Average: {1:.3f}\n\n'.format(i, fifty))
        return df
        
    elif (filename == FILENAME_2):
        # Read the DataFrame from the CSV file.
        df = pd.read_csv(filename, sep=r',',
                         error_bad_lines=False, usecols=(0, 4, 6, 7),
                         names=['row', 'gender', 'state', 'country'])
        
        # Groupby the country and get 'USA' group.
        grouped_cntr = df.groupby('country').get_group('USA')
        grouped_sorted = grouped_cntr.groupby('state').size().sort_values(ascending=False)
        
        # Print the 4 states having most runners.
        print('Thread {0}: Marathon: Top States Having Most Runners: \n{1}\n\n'.format(i, grouped_sorted[0:3]))
        return grouped_sorted
        
    elif (filename == FILENAME_3):
        # Connect to the sqlite3 database and get the dataframe.
        with sqlite3.connect(FILENAME_3) as conn:
            df = pd.read_sql('SELECT hits, atbats FROM batting where year >= ?',
                            conn, params=['1957'])
            df = df[df.hits != 0]
            df = df[df.atbats >= 502]
            df['averages'] = df['hits'] / df['atbats']
        
        # Print the 50th percentile.
        print('Thread {0}: Batting db: \n{1}\n'.format(i, df.describe()))
        print('Thread {0}: Batting db: 50% Batting Average From SQLITE3 Database: {1}\n\n'
              .format(i, df.describe()['averages']['50%']))
        return df
    
# Module function to get tasks from the Queue for work and call task_done
# after end of work. 
def worker(i):
    while True:
        task = q.get()
        if task is None:
            break
        do_work(task, i)
        q.task_done()

# Framework starts.
print("Framework starts....\n\n")
        
# Create the Queue and an empty list of threads. 
q = queue.Queue()
threads = []

# Append three threads to the thread list after start.
for i in range(NUMBER_OF_WORKERTHREADS):
    t = threading.Thread(target=worker, args=(i,))
    t.start()
    threads.append(t)

# Append three 
for task in task_list:
    q.put(task)

# Block until all tasks are done.
q.join()

# Stop worker threads.
for i in range(NUMBER_OF_WORKERTHREADS):
    q.put(None)
   
# Block until all threads are done. 
for t in threads:
    t.join()
    
# Framework exits.
print("Framework exits....\n\n")

Framework starts....


Thread 1: Marathon: Top States Having Most Runners: 
state
MA    4586
CA    2049
NY    1324
dtype: int64


Thread 0: i: Batting: 50% Average: 3.531


Thread 1: Batting db: 
              hits       atbats     averages
count  5013.000000  5013.000000  5013.000000
mean    159.927987   569.186515     0.280551
std      21.198389    42.476059     0.026432
min      99.000000   502.000000     0.195266
25%     145.000000   535.000000     0.262267
50%     158.000000   565.000000     0.279630
75%     174.000000   600.000000     0.298188
max     262.000000   716.000000     0.387987

Thread 1: Batting db: 50% Batting Average From SQLITE3 Database: 0.2796296296296296


Framework exits....


