In [1]:
import os
import math
# conda remove numpy numba --force
# numpy should be grade-downed
# conda install numpy=1.26 numba=0.59
import numpy as np
import pandas as pd
import time
from multiprocessing import Pool 
from pathlib import Path
import sys

In [2]:
from __future__ import division # In Python 3, this future import is unnecessary because true division (/) is the default
from numba import cuda, float32

In [3]:
# Read hourly wind estimates extracted for all residences in the birth year
# Output is array of tuples.
# Each tuple contains sorted hourly wind estimates and unique ids
def readRawData(year):
    windTuples = [] # each tuple contains wind estimates and unique ids for 1000 births
    windData= pd.read_csv(WIND_FOLDER + "/comb_" + str(year) + ".csv")
    print("finished loading wind data")
    uniqueIds = list(set(windData['master_id'])) # all unique ids in the curr batch
    numIds = len(uniqueIds) 
    curIndex = 0

    # for each 1000 unique ids, get wind records for those ids, sort the ids and wind records
    # in the same order, and create a tuple.  Adjust the month variable to be relative to the first 
    # month of coverage (january year before birth)
    while(curIndex < numIds):
        subsetids = uniqueIds[curIndex: min(curIndex+1000,numIds)] # get next 1000 unique ids
        subsetData = windData[windData['master_id'].isin(subsetids)] # get records for this subset of ids
        subsetData.sort_values(by=['master_id'],inplace=True) # sort wind records by unique ids
        subsetids.sort() # sort unique ids
        
        # for records in the year of the birth, add 12 to the months value.  This makes the month variable
        # relative to the first month of coverage (january year before birth)
        # IN OUT PROJECT, WE SHOULD NOT DO THAT!!!!!
        #tempVals = (subsetData['year'] - year +1)*12 # 
        tempVals = (subsetData['year'] - year)*12 # 
        subsetData['month'] = subsetData['month'] + tempVals
        
        windTuples.append((subsetData,subsetids))
        curIndex+=1000 # go to next batch of 1000 unique ids
        print(curIndex)
    print("completed partitioning wind data")
    return(windTuples)

In [4]:
# create a uniqueId->index map.  Called in combination with the python map function
def mapIds(idToMap):
    return masterList.index(idToMap)

In [5]:
# Installs only the CUDA runtime libraries inside your Conda environment
#conda install cudatoolkit
# Install nvvm.dll and the full CUDA toolkit from NVIDIA
# https://developer.nvidia.com/cuda-downloads
# locate CUDA-related executables
# Command Prompt: where nvcc
# I did not have to define the path, but reinstalled Numpy and Cuda

In [6]:
# constants necessary for GPU oeprations.  MUST BE SET/HARD-CODED BEFORE RUNTIME!!!!
N_ANGLES = 360
N_BIRTHS = 624 # input data is preprocessed into batch sizes of 1000
thread_x_dim = math.ceil(360/128)*128 # number of threads per streaming multiprocessor
thread_dim = (thread_x_dim,1) # cuda requires 2d tuple
block_x_dim = 24 # months in 2 years, one of the dimensions of the output wind matrix
block_y_dim = 31 # max days in month, one ofthe dimensions of the output wind matrix
block_z_dim = N_BIRTHS # number of 
block_dim = (block_x_dim,block_y_dim,block_z_dim)
TOTAL_THREADS = thread_x_dim*block_x_dim*block_y_dim*block_z_dim

In [7]:
# cuda function.  Must be compiled before runtime (@cuda.jit decorator)

# rearrange data so hourly records in the matrix are organized by:
#      matrix[hourIndex][dayIndex][monthIndex][uniqueIdIndex]
# INPUTS:
#    idList (numpy array) - indeces for unique ids
#    hour (numpy arrray) - hour of the wind direction measures
#    day (numpy array) - day of the wind direction measures
#    month (numpy array) - month of the wind direction measures
#    wind_dir (numpy array) - wind direction measures
#    outData (numpy array) - stores reorganized wind direction measures
#    maxThread (int) - number of wind direction measures
@cuda.jit
def rearrangeData(idList,hour,day,month,wnd_dir,outData,maxThread):
    tid = cuda.blockIdx.x*cuda.blockDim.x + cuda.threadIdx.x
    if(tid < maxThread):
        outData[hour[tid],day[tid]-1,month[tid]-1,idList[tid]] = wnd_dir[tid]
    

# calculate number of hours each radial degree is upwind of the maternal residence
# results are stored in place in the output dataset
# INPUTS:
#    inputData (numpy array) - hourly wnd direction matrix
#    outputData (numpy array) - where results are stored in place
@cuda.jit
def calcDailyDownwind(inputData,outputData):
    
    
    # inline function.  Must be within compiled function so it can be sent to the 
    # GPU driver and implemented by each streaming multiprocessor at runtime
    # test if a radial degree is within the +/- 15 degree angular window for being downwind
    # INPUTS:
    #    val1 (int) - radial degree
    #    val2 (int) - wind direction
    # OUTPUTS:
    #     True if radial degree is within 15 degrees of wind direction. False otherwise.
    def isDownwind(val1,val2):
        diff1 = val1 - val2 if val1-val2 > 0 else (val1-val2 + 360)
        diff2 = val2 - val1 if val2-val1 >0 else (val2 - val1 + 360)
        minDiff = min(diff1,diff2)
        if(minDiff < 15):
            return 1
        return 0
    
    # each of the 360 degrees is assigned a unique thread id along the x axis of the cuda block
    angle = cuda.threadIdx.x
    
    # cuda has thousands of threads per sm along the x axis.  No need to go beyond 360
    if(angle >= 360):
        return

    # important that these dimensions/indeces match those assigned in the funtion 'rearrange data'
    day = cuda.blockIdx.y
    month = cuda.blockIdx.x
    station = cuda.blockIdx.z

    # for each hour in 24 hours of a day, test if the current radial degree is downwind of the wind direction
    # add to the hours downwind if yes, do not change hoursDownwind otherwise
    hoursDownwind = 0
    for hourIndex in range(24):
        wndAngle = inputData[hourIndex,day,month,station]
        if(wndAngle>=0):
            hoursDownwind += isDownwind(angle,wndAngle)
        else:
            hoursDownwind = -999
    
    # WARNING: IT IS ESSENTIAL THAT EACH THREAD HAS GUARANTEED EXCLUSIVE ACCESS TO IT's INDEX IN THE 
    # outputData matrix.  One thread, one index exactly.

    # update the output dataset in place
    outputData[station,month,day,angle] = hoursDownwind

In [8]:
#BIRTH_FOLDER = 'C:/Users/shiroa1/OneDrive - VUMC/Geocoding/Birth_Addresses_Wind/births_by_year/'
WIND_FOLDER = 'C:/Users/shiroa1/OneDrive - VUMC/Geocoding/WIND_FOLDER/'
YEAR = 2010

In [9]:
windTuples = readRawData(YEAR)
index = 0

finished loading wind data
1000
completed partitioning wind data


In [10]:
windPartitions = 'C:/Users/shiroa1/OneDrive - VUMC/Geocoding/WIND_FOLDER/windPartitions/' + str(YEAR)
if not os.path.exists(windPartitions):
    os.makedirs(windPartitions)

In [11]:
masterIds = 'C:/Users/shiroa1/OneDrive - VUMC/Geocoding/WIND_FOLDER/masterIds/' + str(YEAR)
if not os.path.exists(masterIds):
    os.makedirs(masterIds)

In [11]:
# for each batch of 1000 maternal residences
# rearrange data into matrix organized by day, month, year, and unique id
# calculate daily number of hours downwind for all 360 degrees 
# save data as npy file
for tupleVal in windTuples:
    outputFile = WIND_FOLDER + 'windPartitions/' + str(YEAR) + '/w_' + str(index) + '.npy' # should remove an extra space, not '/w_ '
    
    # no need to process data if output file was already created
    if not(os.path.exists(outputFile)):
        print("starting index %i" %(index))

        windData, masterList = tupleVal[0], tupleVal[1]

        # get index for each unique id
        inputIds = list(map(mapIds,windData['master_id']))

        # transfer data from RAM to GPU memory.  Part of this process
        # includes creating variables on the GPU
        inputList = cuda.to_device(np.array(inputIds))
        cuda.synchronize()
        inputHour = cuda.to_device(np.array(windData['hour']))
        cuda.synchronize()
        inputDay = cuda.to_device(np.array(windData['day']))
        cuda.synchronize()
        inputMonth = cuda.to_device(np.array(windData['month']))
        cuda.synchronize()
        inputDir = cuda.to_device(np.array(windData['wnd_dir']))
        cuda.synchronize()        
        testInput = np.full((24,31,24,N_BIRTHS),-999,np.int16)
        outData = cuda.to_device(testInput)
        cuda.synchronize()
        nThreads = len(windData['hour'])
        cuda.synchronize()
        nBlocks = math.ceil(nThreads/thread_x_dim)
        cuda.synchronize()

        # rearrange wind direction so its organized in a matrix by day,month,year,unique id
        rearrangeData[(nBlocks,1),(thread_x_dim,1)](inputList,inputHour,inputDay,inputMonth,inputDir,outData,nThreads)
        cuda.synchronize()
        print("finished rearranging data")

        blankFile = np.full((N_BIRTHS,24,31,360),-999,np.float64) # blank output matrix.  CUDA writes in place
        cuda.synchronize()

        station_mem = cuda.to_device(blankFile) # version of blank file to store on the GPU
        cuda.synchronize()

        # calcualte number or hours downwind for each of 360 degrees, for each day, each maternal residence
        calcDailyDownwind[block_dim,(thread_dim)](outData,station_mem)
        print("calcualted daily averages")
        cuda.synchronize()
     
        # transfer results from GPU to RAM
        resStation = station_mem.copy_to_host()

        # convert results to pandas dataframe and save 
        with open(outputFile, 'wb') as f:
            np.save(f, resStation)
        masterDF = pd.DataFrame({
            'masterIds':masterList
        })
        masterDF.to_csv(WIND_FOLDER + "masterIds/" + str(YEAR) + "/id_" + str(index) + ".csv",index=False)
    index+=1    

starting index 0




finished rearranging data
calcualted daily averages


In [12]:
loaded_data = np.load("C:/Users/shiroa1/OneDrive - VUMC/Geocoding/WIND_FOLDER/windPartitions/2010/w_0.npy")
loaded_data.shape

(624, 24, 31, 360)

In [13]:
loaded_data

array([[[[   0.,    0.,    0., ...,    0.,    0.,    0.],
         [   8.,    7.,    5., ...,    9.,    8.,    9.],
         [  15.,   15.,   15., ...,   16.,   17.,   15.],
         ...,
         [   5.,    7.,    8., ...,    3.,    4.,    4.],
         [   8.,    8.,    8., ...,    7.,    8.,    7.],
         [   2.,    2.,    3., ...,    2.,    2.,    2.]],

        [[   0.,    0.,    0., ...,    0.,    0.,    0.],
         [   0.,    0.,    0., ...,    0.,    0.,    0.],
         [   3.,    2.,    2., ...,    3.,    3.,    3.],
         ...,
         [-999., -999., -999., ..., -999., -999., -999.],
         [-999., -999., -999., ..., -999., -999., -999.],
         [-999., -999., -999., ..., -999., -999., -999.]],

        [[   0.,    0.,    0., ...,    1.,    0.,    0.],
         [   6.,    7.,    7., ...,    4.,    4.,    4.],
         [   0.,    0.,    0., ...,    0.,    0.,    0.],
         ...,
         [   0.,    0.,    0., ...,    0.,    0.,    0.],
         [   2.,    2.,   

In [None]:
# Example

In [14]:
tupleVal = windTuples[0]

In [15]:
windData, masterList = tupleVal[0], tupleVal[1]

In [16]:
inputIds = list(map(mapIds,windData['master_id']))
inputIds

[0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,


In [17]:
inputList = np.array(inputIds)
inputList

array([0, 0, 0, ..., 1, 1, 1])

In [18]:
inputHour = np.array(windData['hour'])
inputHour.shape

(35040,)

In [25]:
inputDay = np.array(windData['day'])
inputDay.shape     

(1008,)

In [27]:
inputMonth = np.array(windData['month'])
inputMonth.shape       

(1008,)

In [28]:
inputDir = np.array(windData['wnd_dir'])
inputDir.shape      

(1008,)

In [29]:
testInput = np.full((24,31,24,N_BIRTHS),-999,np.int16)
testInput.shape    

(24, 31, 24, 624)

In [30]:
outData = testInput

In [31]:
nThreads = len(windData['hour'])
nThreads

1008

In [32]:
nBlocks = math.ceil(nThreads/thread_x_dim)
nBlocks

3

In [19]:
rearrangeData[(nBlocks,1),(thread_x_dim,1)](inputList,inputHour,inputDay,inputMonth,inputDir,outData,nThreads)



In [37]:
blankFile = np.full((N_BIRTHS,24,31,360),-999,np.float64)
blankFile.shape

(624, 24, 31, 360)

In [20]:
calcDailyDownwind[block_dim,(thread_dim)](outData,blankFile)

In [40]:
outData.shape

(24, 31, 24, 624)

In [None]:
def calcDailyDownwind(inputData,outputData):
    
    
    # inline function.  Must be within compiled function so it can be sent to the 
    # GPU driver and implemented by each streaming multiprocessor at runtime
    # test if a radial degree is within the +/- 15 degree angular window for being downwind
    # INPUTS:
    #    val1 (int) - radial degree
    #    val2 (int) - wind direction
    # OUTPUTS:
    #     True if radial degree is within 15 degrees of wind direction. False otherwise.
    def isDownwind(val1,val2):
        diff1 = val1 - val2 if val1-val2 > 0 else (val1-val2 + 360)
        diff2 = val2 - val1 if val2-val1 >0 else (val2 - val1 + 360)
        minDiff = min(diff1,diff2)
        if(minDiff < 15):
            return 1
        return 0
    
    # each of the 360 degrees is assigned a unique thread id along the x axis of the cuda block
    angle = cuda.threadIdx.x
    
    # cuda has thousands of threads per sm along the x axis.  No need to go beyond 360
    if(angle >= 360):
        return

    # important that these dimensions/indeces match those assigned in the funtion 'rearrange data'
    day = cuda.blockIdx.y
    month = cuda.blockIdx.x
    station = cuda.blockIdx.z

    # for each hour in 24 hours of a day, test if the current radial degree is downwind of the wind direction
    # add to the hours downwind if yes, do not change hoursDownwind otherwise
    hoursDownwind = 0
    for hourIndex in range(24):
        wndAngle = inputData[hourIndex,day,month,station]
        if(wndAngle>=0):
            hoursDownwind += isDownwind(angle,wndAngle)
        else:
            hoursDownwind = -999
    
    # WARNING: IT IS ESSENTIAL THAT EACH THREAD HAS GUARANTEED EXCLUSIVE ACCESS TO IT's INDEX IN THE 
    # outputData matrix.  One thread, one index exactly.

    # update the output dataset in place
    outputData[station,month,day,angle] = hoursDownwind